diff --git a/builder/src/chunkdict_generator.rs b/builder/src/chunkdict_generator.rs new file mode 100644 index 00000000000..4f7ab105d2b --- /dev/null +++ b/builder/src/chunkdict_generator.rs @@ -0,0 +1,280 @@ +// Copyright (C) 2023 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Generate Chunkdict RAFS bootstrap. +//! ------------------------------------------------------------------------------------------------- +//! Bug 1: Inconsistent Chunk Size Leading to Blob Size Less Than 4K(v6_block_size) +//! Description: The size of chunks is not consistent, which results in the possibility that a blob, +//! composed of a group of these chunks, may be less than 4K(v6_block_size) in size. +//! This inconsistency leads to a failure in passing the size check. +//! ------------------------------------------------------------------------------------------------- +//! Bug 2: Incorrect Chunk Number Calculation Due to Premature Check Logic +//! Description: The current logic for calculating the chunk number is based on the formula size/chunk size. +//! However, this approach is flawed as it precedes the actual check which accounts for chunk statistics. +//! Consequently, this leads to inaccurate counting of chunk numbers. + +use super::core::node::{ChunkSource, NodeInfo}; +use super::{BlobManager, Bootstrap, BootstrapManager, BuildContext, BuildOutput, Tree}; +use crate::core::node::Node; +use crate::NodeChunk; +use anyhow::Result; +use nydus_rafs::metadata::chunk::ChunkWrapper; +use nydus_rafs::metadata::inode::InodeWrapper; +use nydus_rafs::metadata::layout::RafsXAttrs; +use nydus_storage::meta::BlobChunkInfoV1Ondisk; +use nydus_utils::compress::Algorithm; +use nydus_utils::digest::RafsDigest; +use std::ffi::OsString; +use std::mem::size_of; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ChunkdictChunkInfo { + pub image_reference: String, + pub version: String, + pub chunk_blob_id: String, + pub chunk_digest: String, + pub chunk_compressed_size: u32, + pub chunk_uncompressed_size: u32, + pub chunk_compressed_offset: u64, + pub chunk_uncompressed_offset: u64, +} + +pub struct ChunkdictBlobInfo { + pub blob_id: String, + pub blob_compressed_size: u64, + pub blob_uncompressed_size: u64, + pub blob_compressor: String, + pub blob_meta_ci_compressed_size: u64, + pub blob_meta_ci_uncompressed_size: u64, + pub blob_meta_ci_offset: u64, +} + +/// Struct to generate chunkdict RAFS bootstrap. +pub struct Generator {} + +impl Generator { + // Generate chunkdict RAFS bootstrap. + pub fn generate( + ctx: &mut BuildContext, + bootstrap_mgr: &mut BootstrapManager, + blob_mgr: &mut BlobManager, + chunkdict_chunks_origin: Vec, + chunkdict_blobs: Vec, + ) -> Result { + // Validate and remove chunks whose belonged blob sizes are smaller than a block. + let mut chunkdict_chunks = chunkdict_chunks_origin.to_vec(); + Self::validate_and_remove_chunks(ctx, &mut chunkdict_chunks); + // Build root tree. + let mut tree = Self::build_root_tree(ctx)?; + + // Build child tree. + let child = Self::build_child_tree(ctx, blob_mgr, &chunkdict_chunks, &chunkdict_blobs)?; + let result = vec![child]; + tree.children = result; + + Self::validate_tree(&tree)?; + + // Build bootstrap. + let mut bootstrap_ctx = bootstrap_mgr.create_ctx()?; + let mut bootstrap = Bootstrap::new(tree)?; + bootstrap.build(ctx, &mut bootstrap_ctx)?; + + let blob_table = blob_mgr.to_blob_table(ctx)?; + let storage = &mut bootstrap_mgr.bootstrap_storage; + bootstrap.dump(ctx, storage, &mut bootstrap_ctx, &blob_table)?; + + BuildOutput::new(blob_mgr, &bootstrap_mgr.bootstrap_storage) + } + + /// Validate tree. + fn validate_tree(tree: &Tree) -> Result<()> { + let pre = &mut |t: &Tree| -> Result<()> { + let node = t.lock_node(); + debug!("chunkdict tree: "); + debug!("inode: {}", node); + for chunk in &node.chunks { + debug!("\t chunk: {}", chunk); + } + Ok(()) + }; + tree.walk_dfs_pre(pre)?; + debug!("chunkdict tree is valid."); + Ok(()) + } + + /// Validates and removes chunks with a total uncompressed size smaller than the block size limit. + fn validate_and_remove_chunks(ctx: &mut BuildContext, chunkdict: &mut Vec) { + let mut chunk_sizes = std::collections::HashMap::new(); + + // Accumulate the uncompressed size for each chunk_blob_id. + for chunk in chunkdict.iter() { + *chunk_sizes.entry(chunk.chunk_blob_id.clone()).or_insert(0) += + chunk.chunk_uncompressed_size as u64; + } + // Find all chunk_blob_ids with a total uncompressed size > v6_block_size. + let small_chunks: Vec = chunk_sizes + .into_iter() + .filter(|&(_, size)| size < ctx.v6_block_size()) + .inspect(|(id, _)| { + eprintln!( + "Warning: Blob with id '{}' is smaller than {} bytes.", + id, + ctx.v6_block_size() + ) + }) + .map(|(id, _)| id) + .collect(); + + // Retain only chunks with chunk_blob_id that has a total uncompressed size > v6_block_size. + chunkdict.retain(|chunk| !small_chunks.contains(&chunk.chunk_blob_id)); + } + + /// Build the root tree. + pub fn build_root_tree(ctx: &mut BuildContext) -> Result { + let mut inode = InodeWrapper::new(ctx.fs_version); + inode.set_ino(1); + inode.set_uid(1000); + inode.set_gid(1000); + inode.set_projid(0); + inode.set_mode(0o660 | libc::S_IFDIR as u32); + inode.set_nlink(3); + inode.set_name_size("/".len()); + inode.set_rdev(0); + inode.set_blocks(256); + let node_info = NodeInfo { + explicit_uidgid: true, + src_dev: 0, + src_ino: 0, + rdev: 0, + source: PathBuf::from("/"), + path: PathBuf::from("/"), + target: PathBuf::from("/"), + target_vec: vec![OsString::from("/")], + symlink: None, + xattrs: RafsXAttrs::default(), + v6_force_extended_inode: true, + }; + let root_node = Node::new(inode, node_info, 0); + let tree = Tree::new(root_node); + Ok(tree) + } + + /// Build the child tree. + fn build_child_tree( + ctx: &mut BuildContext, + blob_mgr: &mut BlobManager, + chunkdict_chunks: &[ChunkdictChunkInfo], + chunkdict_blobs: &[ChunkdictBlobInfo], + ) -> Result { + let mut inode = InodeWrapper::new(ctx.fs_version); + inode.set_ino(2); + inode.set_uid(0); + inode.set_gid(0); + inode.set_projid(0); + inode.set_mode(0o660 | libc::S_IFREG as u32); + inode.set_nlink(1); + inode.set_name_size("chunkdict".len()); + inode.set_rdev(0); + inode.set_blocks(256); + let node_info = NodeInfo { + explicit_uidgid: true, + src_dev: 0, + src_ino: 1, + rdev: 0, + source: PathBuf::from("/"), + path: PathBuf::from("/chunkdict"), + target: PathBuf::from("/chunkdict"), + target_vec: vec![OsString::from("/"), OsString::from("/chunkdict")], + symlink: None, + xattrs: RafsXAttrs::new(), + v6_force_extended_inode: true, + }; + let mut node = Node::new(inode, node_info, 0); + + // Insert chunks. + Self::insert_chunks(ctx, blob_mgr, &mut node, chunkdict_chunks, chunkdict_blobs)?; + let node_size: u64 = node + .chunks + .iter() + .map(|chunk| chunk.inner.uncompressed_size() as u64) + .sum(); + node.inode.set_size(node_size); + + // Update child count. + node.inode.set_child_count(node.chunks.len() as u32); + let child = Tree::new(node); + child + .lock_node() + .v5_set_dir_size(ctx.fs_version, &child.children); + Ok(child) + } + + /// Insert chunks. + fn insert_chunks( + ctx: &mut BuildContext, + blob_mgr: &mut BlobManager, + node: &mut Node, + chunkdict_chunks: &[ChunkdictChunkInfo], + chunkdict_blobs: &[ChunkdictBlobInfo], + ) -> Result<()> { + for (index, chunk_info) in chunkdict_chunks.iter().enumerate() { + let chunk_size: u32 = chunk_info.chunk_compressed_size; + let file_offset = index as u64 * chunk_size as u64; + let mut chunk = ChunkWrapper::new(ctx.fs_version); + + // Update blob context. + let (blob_index, blob_ctx) = + blob_mgr.get_or_cerate_blob_for_chunkdict(ctx, &chunk_info.chunk_blob_id)?; + let chunk_uncompressed_size = chunk_info.chunk_uncompressed_size; + let pre_d_offset = blob_ctx.current_uncompressed_offset; + blob_ctx.uncompressed_blob_size = pre_d_offset + chunk_uncompressed_size as u64; + blob_ctx.current_uncompressed_offset += chunk_uncompressed_size as u64; + + blob_ctx.blob_meta_header.set_ci_uncompressed_size( + blob_ctx.blob_meta_header.ci_uncompressed_size() + + size_of::() as u64, + ); + blob_ctx.blob_meta_header.set_ci_compressed_size( + blob_ctx.blob_meta_header.ci_uncompressed_size() + + size_of::() as u64, + ); + let chunkdict_blob_info = chunkdict_blobs + .iter() + .find(|blob| blob.blob_id == chunk_info.chunk_blob_id) + .unwrap(); + blob_ctx.blob_compressor = + Algorithm::from_str(chunkdict_blob_info.blob_compressor.as_str())?; + blob_ctx + .blob_meta_header + .set_ci_uncompressed_size(chunkdict_blob_info.blob_meta_ci_uncompressed_size); + blob_ctx + .blob_meta_header + .set_ci_compressed_size(chunkdict_blob_info.blob_meta_ci_compressed_size); + blob_ctx + .blob_meta_header + .set_ci_compressed_offset(chunkdict_blob_info.blob_meta_ci_offset); + blob_ctx.blob_meta_header.set_ci_compressor(Algorithm::Zstd); + + // Update chunk context. + let chunk_index = blob_ctx.alloc_chunk_index()?; + chunk.set_blob_index(blob_index); + chunk.set_index(chunk_index); + chunk.set_file_offset(file_offset); + chunk.set_compressed_size(chunk_info.chunk_compressed_size); + chunk.set_compressed_offset(chunk_info.chunk_compressed_offset); + chunk.set_uncompressed_size(chunk_info.chunk_uncompressed_size); + chunk.set_uncompressed_offset(chunk_info.chunk_uncompressed_offset); + chunk.set_id(RafsDigest::from_string(&chunk_info.chunk_digest)); + + node.chunks.push(NodeChunk { + source: ChunkSource::Build, + inner: Arc::new(chunk.clone()), + }); + } + Ok(()) + } +} diff --git a/builder/src/core/context.rs b/builder/src/core/context.rs index 14f33db855c..eb7a77728c8 100644 --- a/builder/src/core/context.rs +++ b/builder/src/core/context.rs @@ -597,6 +597,9 @@ impl BlobContext { blob_ctx .blob_meta_header .set_encrypted(features.contains(BlobFeatures::ENCRYPTED)); + blob_ctx + .blob_meta_header + .set_is_chunkdict_generated(features.contains(BlobFeatures::IS_CHUNKDICT_GENERATED)); blob_ctx } @@ -955,6 +958,32 @@ impl BlobManager { } } + /// Get or cerate blob for chunkdict, this is used for chunk deduplication. + pub fn get_or_cerate_blob_for_chunkdict( + &mut self, + ctx: &BuildContext, + id: &str, + ) -> Result<(u32, &mut BlobContext)> { + if self.get_blob_idx_by_id(id).is_none() { + let blob_ctx = Self::new_blob_ctx(ctx)?; + self.current_blob_index = Some(self.alloc_index()?); + self.add_blob(blob_ctx); + } else { + self.current_blob_index = self.get_blob_idx_by_id(id); + } + let (_, blob_ctx) = self.get_current_blob().unwrap(); + if blob_ctx.blob_id.is_empty() { + blob_ctx.blob_id = id.to_string(); + } + // Safe to unwrap because the blob context has been added. + Ok(self.get_current_blob().unwrap()) + } + + /// Determine if the given blob has been created. + pub fn has_blob(&self, blob_id: &str) -> bool { + self.get_blob_idx_by_id(blob_id).is_some() + } + /// Set the global chunk dictionary for chunk deduplication. pub fn set_chunk_dict(&mut self, dict: Arc) { self.global_chunk_dict = dict @@ -1097,6 +1126,7 @@ impl BlobManager { compressed_blob_size, blob_features, flags, + build_ctx.is_chunkdict_generated, ); } RafsBlobTable::V6(table) => { @@ -1116,6 +1146,7 @@ impl BlobManager { ctx.blob_toc_digest, ctx.blob_meta_size, ctx.blob_toc_size, + build_ctx.is_chunkdict_generated, ctx.blob_meta_header, ctx.cipher_object.clone(), ctx.cipher_ctx.clone(), @@ -1293,6 +1324,9 @@ pub struct BuildContext { pub configuration: Arc, /// Generate the blob cache and blob meta pub blob_cache_generator: Option, + + /// Whether is chunkdict. + pub is_chunkdict_generated: bool, } impl BuildContext { @@ -1361,6 +1395,7 @@ impl BuildContext { features, configuration: Arc::new(ConfigV2::default()), blob_cache_generator: None, + is_chunkdict_generated: false, } } @@ -1379,6 +1414,10 @@ impl BuildContext { pub fn set_configuration(&mut self, config: Arc) { self.configuration = config; } + + pub fn set_is_chunkdict(&mut self, is_chunkdict: bool) { + self.is_chunkdict_generated = is_chunkdict; + } } impl Default for BuildContext { @@ -1411,6 +1450,7 @@ impl Default for BuildContext { features: Features::new(), configuration: Arc::new(ConfigV2::default()), blob_cache_generator: None, + is_chunkdict_generated: false, } } } diff --git a/builder/src/lib.rs b/builder/src/lib.rs index 7d785ea3f88..54f47e264a7 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -23,6 +23,9 @@ use sha2::Digest; use self::core::node::{Node, NodeInfo}; +pub use self::chunkdict_generator::ChunkdictBlobInfo; +pub use self::chunkdict_generator::ChunkdictChunkInfo; +pub use self::chunkdict_generator::Generator; pub use self::compact::BlobCompactor; pub use self::core::bootstrap::Bootstrap; pub use self::core::chunk_dict::{parse_chunk_dict_arg, ChunkDict, HashChunkDict}; @@ -40,6 +43,7 @@ pub use self::merge::Merger; pub use self::stargz::StargzBuilder; pub use self::tarball::TarballBuilder; +mod chunkdict_generator; mod compact; mod core; mod directory; diff --git a/contrib/nydusify/cmd/nydusify.go b/contrib/nydusify/cmd/nydusify.go index 4849feaa64d..a4ff5856780 100644 --- a/contrib/nydusify/cmd/nydusify.go +++ b/contrib/nydusify/cmd/nydusify.go @@ -657,12 +657,45 @@ func main() { Usage: "One or more Nydus image reference(Multiple images should be split by commas)", EnvVars: []string{"SOURCES"}, }, + &cli.StringFlag{ + Name: "target", + Required: false, + Usage: "Target chunkdict image (Nydus) reference", + EnvVars: []string{"TARGET"}, + }, &cli.BoolFlag{ Name: "source-insecure", Required: false, Usage: "Skip verifying server certs for HTTPS source registry", EnvVars: []string{"SOURCE_INSECURE"}, }, + &cli.BoolFlag{ + Name: "target-insecure", + Required: false, + Usage: "Skip verifying server certs for HTTPS target registry", + EnvVars: []string{"TARGET_INSECURE"}, + }, + + &cli.StringFlag{ + Name: "backend-type", + Value: "", + Usage: "Type of storage backend, possible values: 'oss', 's3'", + EnvVars: []string{"BACKEND_TYPE"}, + }, + &cli.StringFlag{ + Name: "backend-config", + Value: "", + Usage: "Json configuration string for storage backend", + EnvVars: []string{"BACKEND_CONFIG"}, + }, + &cli.PathFlag{ + Name: "backend-config-file", + Value: "", + TakesFile: true, + Usage: "Json configuration file for storage backend", + EnvVars: []string{"BACKEND_CONFIG_FILE"}, + }, + &cli.StringFlag{ Name: "work-dir", Value: "./output", @@ -675,6 +708,12 @@ func main() { Usage: "Path to the nydus-image binary, default to search in PATH", EnvVars: []string{"NYDUS_IMAGE"}, }, + + &cli.BoolFlag{ + Name: "all-platforms", + Value: false, + Usage: "Generate chunkdict image for all platforms, conflicts with --platform", + }, &cli.StringFlag{ Name: "platform", Value: "linux/" + runtime.GOARCH, @@ -684,17 +723,31 @@ func main() { Action: func(c *cli.Context) error { setupLogLevel(c) + backendType, backendConfig, err := getBackendConfig(c, "", false) + if err != nil { + return err + } + _, arch, err := provider.ExtractOsArch(c.String("platform")) if err != nil { return err } generator, err := generator.New(generator.Opt{ - WorkDir: c.String("work-dir"), Sources: c.StringSlice("sources"), + Target: c.String("target"), SourceInsecure: c.Bool("source-insecure"), + TargetInsecure: c.Bool("target-insecure"), + + BackendType: backendType, + BackendConfig: backendConfig, + BackendForcePush: c.Bool("backend-force-push"), + + WorkDir: c.String("work-dir"), NydusImagePath: c.String("nydus-image"), ExpectedArch: arch, + AllPlatforms: c.Bool("all-platforms"), + Platforms: c.String("platform"), }) if err != nil { return err diff --git a/contrib/nydusify/pkg/build/builder.go b/contrib/nydusify/pkg/build/builder.go index 564f57454ae..177c0b9a209 100644 --- a/contrib/nydusify/pkg/build/builder.go +++ b/contrib/nydusify/pkg/build/builder.go @@ -41,8 +41,11 @@ type CompactOption struct { CompactConfigPath string } -type SaveOption struct { - BootstrapPath string +type GenerateOption struct { + BootstrapPaths []string + DatabasePath string + ChunkdictBootstrapPath string + OutputPath string } type Builder struct { @@ -148,15 +151,22 @@ func (builder *Builder) Run(option BuilderOption) error { return builder.run(args, option.PrefetchPatterns) } -// Save calls `nydus-image chunkdict save` to parse Nydus bootstrap -func (builder *Builder) Save(option SaveOption) error { +// Generate calls `nydus-image chunkdict generate` to get chunkdict +func (builder *Builder) Generate(option GenerateOption) error { + logrus.Infof("Invoking 'nydus-image chunkdict generate' command") args := []string{ "chunkdict", - "save", + "generate", "--log-level", "warn", "--bootstrap", - option.BootstrapPath, + option.ChunkdictBootstrapPath, + "--database", + option.DatabasePath, + "--output-json", + option.OutputPath, } + args = append(args, option.BootstrapPaths...) + return builder.run(args, "") } diff --git a/contrib/nydusify/pkg/chunkdict/generator/generator.go b/contrib/nydusify/pkg/chunkdict/generator/generator.go index c9472c459cc..13c91a9ca58 100644 --- a/contrib/nydusify/pkg/chunkdict/generator/generator.go +++ b/contrib/nydusify/pkg/chunkdict/generator/generator.go @@ -1,7 +1,10 @@ package generator import ( + "compress/gzip" "context" + "encoding/json" + "io" "io/fs" "os" "path/filepath" @@ -10,20 +13,47 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/containerd/containerd/namespaces" + "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/backend" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/build" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/parser" - "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/provider" + originprovider "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/provider" + "github.com/goharbor/acceleration-service/pkg/remote" + + "github.com/containerd/nydus-snapshotter/pkg/converter" + "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/converter/provider" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/utils" + "github.com/dustin/go-humanize" + "github.com/goharbor/acceleration-service/pkg/platformutil" + serverutils "github.com/goharbor/acceleration-service/pkg/utils" + "github.com/opencontainers/go-digest" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + + "github.com/containerd/containerd/content" + containerdErrdefs "github.com/containerd/containerd/errdefs" + "github.com/goharbor/acceleration-service/pkg/errdefs" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) // Opt defines Chunkdict generate options. // Note: sources is one or more Nydus image references. type Opt struct { - WorkDir string Sources []string + Target string SourceInsecure bool + TargetInsecure bool + + BackendType string + BackendConfig string + BackendForcePush bool + + WorkDir string NydusImagePath string ExpectedArch string + + AllPlatforms bool + Platforms string } // Generator generates chunkdict by deduplicating multiple nydus images @@ -33,12 +63,16 @@ type Generator struct { sourcesParser []*parser.Parser } +type output struct { + Blobs []string +} + // New creates Generator instance. func New(opt Opt) (*Generator, error) { // TODO: support sources image resolver var sourcesParser []*parser.Parser for _, source := range opt.Sources { - sourcesRemote, err := provider.DefaultRemote(source, opt.SourceInsecure) + sourcesRemote, err := originprovider.DefaultRemote(source, opt.SourceInsecure) if err != nil { return nil, errors.Wrap(err, "Init source image parser") } @@ -59,48 +93,435 @@ func New(opt Opt) (*Generator, error) { // Generate saves multiple Nydus bootstraps into the database one by one. func (generator *Generator) Generate(ctx context.Context) error { - for index := range generator.Sources { - if err := generator.save(ctx, index); err != nil { - if utils.RetryWithHTTP(err) { + var bootstrapPaths []string + bootstrapPaths, err := generator.pull(ctx) + + if err != nil { + if utils.RetryWithHTTP(err) { + for index := range generator.Sources { generator.sourcesParser[index].Remote.MaybeWithHTTP(err) } - if err := generator.save(ctx, index); err != nil { - return err - } + } + bootstrapPaths, err = generator.pull(ctx) + if err != nil { + return err } } - return nil -} -// "save" stores information of chunk and blob of a Nydus Image in the database -func (generator *Generator) save(ctx context.Context, index int) error { - sourceParsed, err := generator.sourcesParser[index].Parse(ctx) + chunkdictBootstrapPath, outputPath, err := generator.generate(ctx, bootstrapPaths) if err != nil { - return errors.Wrap(err, "parse Nydus image") + return err } - // Create a directory to store the image bootstrap - nydusImageName := strings.Replace(generator.Sources[index], "/", ":", -1) - folderPath := filepath.Join(generator.WorkDir, nydusImageName) - if err := os.MkdirAll(folderPath, fs.ModePerm); err != nil { - return errors.Wrap(err, "creat work directory") + if err := generator.push(ctx, chunkdictBootstrapPath, outputPath); err != nil { + return err } - if err := generator.Output(ctx, sourceParsed, folderPath, index); err != nil { - return errors.Wrap(err, "output image information") + + // return os.RemoveAll(generator.WorkDir) + return nil +} + +// Pull the bootstrap of nydus image +func (generator *Generator) pull(ctx context.Context) ([]string, error) { + var bootstrapPaths []string + for index := range generator.Sources { + sourceParsed, err := generator.sourcesParser[index].Parse(ctx) + if err != nil { + return nil, errors.Wrap(err, "parse Nydus image") + } + + // Create a directory to store the image bootstrap + nydusImageName := strings.Replace(generator.Sources[index], "/", ":", -1) + bootstrapDirPath := filepath.Join(generator.WorkDir, nydusImageName) + if err := os.MkdirAll(bootstrapDirPath, fs.ModePerm); err != nil { + return nil, errors.Wrap(err, "creat work directory") + } + if err := generator.Output(ctx, sourceParsed, bootstrapDirPath, index); err != nil { + return nil, errors.Wrap(err, "output image information") + } + bootstrapPath := filepath.Join(bootstrapDirPath, "nydus_bootstrap") + bootstrapPaths = append(bootstrapPaths, bootstrapPath) } + return bootstrapPaths, nil +} - // Invoke "nydus-image save" command +func (generator *Generator) generate(_ context.Context, bootstrapSlice []string) (string, string, error) { + // Invoke "nydus-image chunkdict generate" command + currentDir, _ := os.Getwd() builder := build.NewBuilder(generator.NydusImagePath) - if err := builder.Save(build.SaveOption{ - BootstrapPath: filepath.Join(folderPath, "nydus_bootstrap"), + + chunkdictBootstrapPath := filepath.Join(generator.WorkDir, "chunkdict_bootstrap") + databaseType := "sqlite" + var databasePath string + if strings.HasPrefix(generator.WorkDir, "/") { + databasePath = databaseType + "://" + filepath.Join(generator.WorkDir, "database.db") + } else { + databasePath = databaseType + "://" + filepath.Join(currentDir, generator.WorkDir, "database.db") + } + outputPath := filepath.Join(generator.WorkDir, "nydus_bootstrap_output.json") + + if err := builder.Generate(build.GenerateOption{ + BootstrapPaths: bootstrapSlice, + ChunkdictBootstrapPath: chunkdictBootstrapPath, + DatabasePath: databasePath, + OutputPath: outputPath, }); err != nil { - return errors.Wrap(err, "invalid nydus bootstrap format") + return "", "", errors.Wrap(err, "invalid nydus bootstrap format") + } + + logrus.Infof("Successfully generate image chunk dictionary") + return chunkdictBootstrapPath, outputPath, nil +} + +func hosts(generator *Generator) remote.HostFunc { + maps := make(map[string]bool) + for _, source := range generator.Sources { + maps[source] = generator.SourceInsecure + } + + maps[generator.Target] = generator.TargetInsecure + return func(ref string) (remote.CredentialFunc, bool, error) { + return remote.NewDockerConfigCredFunc(), maps[ref], nil + } +} + +func (generator *Generator) push(ctx context.Context, chunkdictBootstrapPath string, outputPath string) error { + // Basic configuration + ctx = namespaces.WithNamespace(ctx, "nydusify") + platformMC, err := platformutil.ParsePlatforms(generator.AllPlatforms, generator.Platforms) + if err != nil { + return err + } + + pvd, err := provider.New(generator.WorkDir, hosts(generator), 200, "v1", platformMC, 0) + if err != nil { + return err + } + + var bkd backend.Backend + if generator.BackendType != "" { + bkd, err = backend.NewBackend(generator.BackendType, []byte(generator.BackendConfig), nil) + if err != nil { + return errors.Wrapf(err, "new backend") + } + } + + // Pull source image + for index := range generator.Sources { + if err := pvd.Pull(ctx, generator.Sources[index]); err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + if err := pvd.Pull(ctx, generator.Sources[index]); err != nil { + return errors.Wrap(err, "try to pull image") + } + } else { + return errors.Wrap(err, "pull source image") + } + } + } + + logrus.Infof("pulled source image %s", generator.Sources[0]) + sourceImage, err := pvd.Image(ctx, generator.Sources[0]) + if err != nil { + return errors.Wrap(err, "find image from store") } + sourceDescs, err := serverutils.GetManifests(ctx, pvd.ContentStore(), *sourceImage, platformMC) + if err != nil { + return errors.Wrap(err, "get image manifests") + } + + targetDescs := make([]ocispec.Descriptor, len(sourceDescs)) + + sem := semaphore.NewWeighted(1) + eg := errgroup.Group{} + for idx := range sourceDescs { + func(idx int) { + eg.Go(func() error { + sem.Acquire(context.Background(), 1) + defer sem.Release(1) + sourceDesc := sourceDescs[idx] + targetDesc := &sourceDesc - logrus.Infof("Save chunk information from image %s", generator.sourcesParser[index].Remote.Ref) + // Get the blob from backend + descs, _targetDesc, err := pushBlobFromBackend(ctx, pvd, bkd, sourceDesc, *generator, chunkdictBootstrapPath, outputPath) + if err != nil { + return errors.Wrap(err, "get resolver") + } + if _targetDesc != nil { + targetDesc = _targetDesc + store := newStore(pvd.ContentStore(), descs) + pvd.SetContentStore(store) + } - if err := os.RemoveAll(folderPath); err != nil { - return errors.Wrap(err, "remove work directory") + targetDescs[idx] = *targetDesc + + if err := pvd.Push(ctx, *targetDesc, generator.Target); err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + if err := pvd.Push(ctx, *targetDesc, generator.Target); err != nil { + return errors.Wrap(err, "try to push image manifest") + } + } else { + return errors.Wrap(err, "push target image manifest") + } + } + return nil + }) + }(idx) + } + if err := eg.Wait(); err != nil { + return errors.Wrap(err, "push image manifests") } return nil } + +func pushBlobFromBackend( + ctx context.Context, pvd *provider.Provider, bkd backend.Backend, src ocispec.Descriptor, generator Generator, bootstrapPath string, outputPath string, +) ([]ocispec.Descriptor, *ocispec.Descriptor, error) { + manifest := ocispec.Manifest{} + if _, err := serverutils.ReadJSON(ctx, pvd.ContentStore(), &manifest, src); err != nil { + return nil, nil, errors.Wrap(err, "read manifest from store") + } + fsversion := src.Annotations["containerd.io/snapshot/nydus-fs-version"] + // Read the Nydusify output JSON to get the list of blobs + var out output + bytes, err := os.ReadFile(outputPath) + if err != nil { + return nil, nil, errors.Wrap(err, "read output file") + } + if err := json.Unmarshal(bytes, &out); err != nil { + return nil, nil, errors.Wrap(err, "unmarshal output json") + } + + blobIDs := []string{} + blobIDMap := map[string]bool{} + for _, blobID := range out.Blobs { + if blobIDMap[blobID] { + continue + } + blobIDs = append(blobIDs, blobID) + blobIDMap[blobID] = true + } + blobDescs := make([]ocispec.Descriptor, len(blobIDs)) + + eg, ctx := errgroup.WithContext(ctx) + sem := semaphore.NewWeighted(int64(provider.LayerConcurrentLimit)) + for idx := range blobIDs { + func(idx int) { + eg.Go(func() error { + sem.Acquire(context.Background(), 1) + defer sem.Release(1) + + blobID := blobIDs[idx] + blobDigest := digest.Digest("sha256:" + blobID) + + var blobSize int64 + var rc io.ReadCloser + + if bkd != nil { + rc, err = bkd.Reader(blobID) + if err != nil { + return errors.Wrap(err, "get blob reader") + } + blobSize, err = bkd.Size(blobID) + if err != nil { + return errors.Wrap(err, "get blob size") + } + } else { + imageDesc, err := generator.sourcesParser[0].Remote.Resolve(ctx) + if err != nil { + if strings.Contains(err.Error(), "x509: certificate signed by unknown authority") { + logrus.Warningln("try to enable \"--source-insecure\" / \"--target-insecure\" option") + } + return errors.Wrap(err, "resolve image") + } + rc, err = generator.sourcesParser[0].Remote.Pull(ctx, *imageDesc, true) + if err != nil { + return errors.Wrap(err, "get blob reader") + } + blobInfo, err := pvd.ContentStore().Info(ctx, blobDigest) + if err != nil { + return errors.Wrap(err, "get info from content store") + } + blobSize = blobInfo.Size + } + defer rc.Close() + + blobSizeStr := humanize.Bytes(uint64(blobSize)) + logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushing blob from backend") + + blobDescs[idx] = ocispec.Descriptor{ + Digest: blobDigest, + Size: blobSize, + MediaType: converter.MediaTypeNydusBlob, + Annotations: map[string]string{ + converter.LayerAnnotationNydusBlob: "true", + }, + } + writer, err := getPushWriter(ctx, pvd, blobDescs[idx], generator.Opt) + if err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + writer, err = getPushWriter(ctx, pvd, blobDescs[idx], generator.Opt) + } + if err != nil { + return errors.Wrap(err, "get push writer") + } + } + if writer != nil { + defer writer.Close() + return content.Copy(ctx, writer, rc, blobSize, blobDigest) + } + + logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend") + + return nil + + }) + }(idx) + } + + if err := eg.Wait(); err != nil { + return nil, nil, errors.Wrap(err, "push blobs") + } + + // Update manifest blob layers + manifest.Layers = nil + manifest.Layers = append(blobDescs, manifest.Layers...) + + // Update bootstrap + cw, err := content.OpenWriter(ctx, pvd.ContentStore(), content.WithRef("merge-bootstrap")) + if err != nil { + return nil, nil, errors.Wrap(err, "open content store writer") + } + defer cw.Close() + + bootstrapPathTar := "image/image.boot" + rc, err := utils.PackTargz(bootstrapPath, bootstrapPathTar, false) + if err != nil { + return nil, nil, errors.Wrap(err, "get bootstrap reader") + } + defer rc.Close() + + gw := gzip.NewWriter(cw) + uncompressedDgst := digest.SHA256.Digester() + compressed := io.MultiWriter(gw, uncompressedDgst.Hash()) + + buffer := make([]byte, 32*1024) + if _, err := io.CopyBuffer(compressed, rc, buffer); err != nil { + return nil, nil, errors.Wrapf(err, "copy bootstrap targz into content store") + } + if err := gw.Close(); err != nil { + return nil, nil, errors.Wrap(err, "close gzip writer") + } + + compressedDgst := cw.Digest() + if err := cw.Commit(ctx, 0, compressedDgst, content.WithLabels(map[string]string{ + "containerd.io/uncompressed": uncompressedDgst.Digest().String(), + })); err != nil { + if !containerdErrdefs.IsAlreadyExists(err) { + return nil, nil, errors.Wrap(err, "commit to content store") + } + } + if err := cw.Close(); err != nil { + return nil, nil, errors.Wrap(err, "close content store writer") + } + + bootstrapInfo, err := pvd.ContentStore().Info(ctx, compressedDgst) + if err != nil { + return nil, nil, errors.Wrap(err, "get info from content store") + } + bootstrapSize := bootstrapInfo.Size + + bootstrapDesc := ocispec.Descriptor{ + Digest: compressedDgst, + Size: bootstrapSize, + MediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip", + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-bootstrap": "true", + "containerd.io/snapshot/nydus-fs-version": fsversion, + }, + } + manifest.Layers = append(manifest.Layers, bootstrapDesc) + + // Update image config + blobDigests := []digest.Digest{} + for idx := range blobDescs { + blobDigests = append(blobDigests, blobDescs[idx].Digest) + } + + config := ocispec.Image{} + if _, err := serverutils.ReadJSON(ctx, pvd.ContentStore(), &config, manifest.Config); err != nil { + return nil, nil, errors.Wrap(err, "read config json") + } + config.RootFS.DiffIDs = nil + config.RootFS.DiffIDs = append(blobDigests, config.RootFS.DiffIDs...) + config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, digest.Digest(uncompressedDgst.Digest().String())) + configDesc, err := serverutils.WriteJSON(ctx, pvd.ContentStore(), config, manifest.Config, generator.Target, nil) + if err != nil { + return nil, nil, errors.Wrap(err, "write config json") + } + manifest.Config = *configDesc + target, err := serverutils.WriteJSON(ctx, pvd.ContentStore(), &manifest, src, generator.Target, nil) + if err != nil { + return nil, nil, errors.Wrap(err, "write manifest json") + } + + return blobDescs, target, nil +} + +func getPushWriter(ctx context.Context, pvd *provider.Provider, desc ocispec.Descriptor, opt Opt) (content.Writer, error) { + resolver, err := pvd.Resolver(opt.Target) + if err != nil { + return nil, errors.Wrap(err, "get resolver") + } + + ref := opt.Target + if !strings.Contains(ref, "@") { + ref = ref + "@" + desc.Digest.String() + } + pusher, err := resolver.Pusher(ctx, ref) + if err != nil { + return nil, errors.Wrap(err, "create pusher") + } + writer, err := pusher.Push(ctx, desc) + if err != nil { + if containerdErrdefs.IsAlreadyExists(err) { + return nil, nil + } + return nil, err + } + + return writer, nil +} + +type store struct { + content.Store + remotes []ocispec.Descriptor +} + +func newStore(base content.Store, remotes []ocispec.Descriptor) *store { + return &store{ + Store: base, + remotes: remotes, + } +} + +func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + info, err := s.Store.Info(ctx, dgst) + if err != nil { + if !containerdErrdefs.IsNotFound(err) { + return content.Info{}, err + } + for _, desc := range s.remotes { + if desc.Digest == dgst { + return content.Info{ + Digest: desc.Digest, + Size: desc.Size, + }, nil + } + } + return content.Info{}, err + } + return info, nil +} diff --git a/docs/chunk-deduplication.md b/docs/chunk-deduplication.md new file mode 100644 index 00000000000..a33472d060c --- /dev/null +++ b/docs/chunk-deduplication.md @@ -0,0 +1,153 @@ +# Chunk-Level Deduplication: Storage Optimization for Nydus Images + +## Probntroduction + +In container images, there are often a large number of duplicate files or content, and these duplicate parts occupy a large amount of storage space, especially in high-density deployment scenarios. As the number of Nydus images grows, it will bring many problems such as low storage space utilization and excessive consumption of bandwidth resources. To do this, an effective deduplication mechanism (deduplication) needs to be designed to solve this problem. + +Unlike traditional OCI, which distributes images at a layer-granular level, the smallest unit of a Nydus image is a chunk, so the deduplication algorithm needs to be deduplicated in chunk units. At the same time, we want to deduplicate multiple aspects of the Nydus image, including between Nydus images and between different versions of the same Nydus image. No matter which deduplication method is essentially to deduplicate the repeated chunks in the image, only one duplicate chunk is retained, and the reference to the chunk is used instead of other duplicate chunks to reduce the storage space occupation, so as to maximize the data transmission and storage capabilities of Nydus and improve the access speed and efficiency of the image. + +## General idea + +The deduplication algorithm first needs to select the duplicate chunk in the image according to the image information such as the number of occurrences of chunk, chunk size, chunk image to which the chunk belongs and the corresponding version, and generate chunkdict, chunkdict records the unique identifier or fingerprint of chunk, only need to store chunkdict, other images can refer to chunk in chunkdict by reference. + +The deduplication algorithm is divided into two parts, the first part is the DBSCAN clustering algorithm, which deduplicates different images; The second part is the exponential smoothing algorithm, which deduplicates different versions within the image. + +**The general process is as follows:** + +1. Store the image information to the local database, +2. Extract the image information and call the DBSCAN clustering algorithm to deduplicate different images. +3. Deduplicate the dictionary content in 2, and call the exponential smoothing algorithm for each image separately for image version deduplication. +4. Get the deduplication dictionary generated by running the two algorithms and drop the disk. +5. Generate a chunkdict image and push it to the remote repository + +## Algorithm detailed process + +### Overall Input + +```shell +nydusify chunkdict generate --sources \ + registry.com/redis:nydus_7.0.1, \ + registry.com/redis:nydus_7.0.2, \ + registry.com/redis:nydus_7.0.3 \ + -- target registry.com/redis:nydus_chunkdict \ + --source-insecure --target-insecure + # Optional + --backend-config-file /path/to/backend-config.json \ + --backend-type oss +``` + +## Use the chunk dict image to reduce the incremental size of the new image + +``` +nydusify convert + --source registry.com/redis:OCI_7.0.4 \ + --target registry.com/redis:nydus_7.0.4 \ + --chunk-dict registry.com/redis:nydus_chunkdict +``` + +*** +`nydusify chunkdict generate` calls subcommand `nydus-image chunkdict generate` to store image information into the database and generate a new bootstrap as chunkdict bootstrap. + +Download multiple Nydus images in advance and put them into the repository as datasets, such as selecting 10 consecutive versions of redis and alpine as the image dataset, and execute the command `nydus-image chunkdict generate` to store the information of the chunk and blob in the chunk and blob table of the database. + +```shell +# Deposit multiple images into the database +nydus-image chunkdict generate --source \ + /path/localhost:5000:redis:nydus_7.0.1/nydus_bootstrap, \ + /path/localhost:5000:redis:nydus_7.0.2/nydus_bootstrap, \ + /path/localhost:5000:redis:nydus_7.0.3/nydus_bootstrap \ + --bootstrap /path/to/chunkdict_bootstrap\ + --database /path/to/database.db\ + --output-json /path/to/nydus_bootstrap_output.json +``` + +*** + +### Deduplication algorithm + +#### Algorithm 1 Deduplication between different images (DBSCAN clustering algorithm) + +*** +**Basic principle:** DBSCAN is a density-based clustering algorithm, which mainly investigates the connectivity between samples through sample density, samples of the same category, they are closely connected, in other words, there must be samples of the same category not far around any sample of the category. Therefore, it can group a group of objects with high density and close distance, can find clusters of arbitrary shapes, and does not need to specify the number of clusters in advance, which is suitable for high-density deployment scenarios. + +**Input:** Read the chunk information in the database and store it in the chunk list. Chunk information includes:image_name, version, chunk_blob_id, chunk_digest, chunk_compressed_size, and so on. + +**Output:** The chunk dictionary corresponding to each image cluster + +**Basic steps:** +**1.** Select a part of the version as the training set and the rest as the test set according to a certain proportion of all images. + +**2.** Divide all chunks in the training set into a new list according to the image_name, and each list corresponds to an image and all chunk sets in the image. + +**3.** These images are done using the DBSCAN (Density-Based Spatial Clustering of Applications with Noise) algorithm +Clustering. + +*** +3.1 Initialize the core point collection $Omega$ as an empty set,and set the clustering algorithm radius $gamma = 0.5$, and the sample number threshold $MinPts = 10$ + +3.2 Loop through each image and its corresponding chunk list,and calculate its distance from other images according to the following formula. +$$ distance (x,y)= \frac{\lvert C(R_x) \cup C(R_y) \rvert - \lvert C(R_x) \cap C(R_y) \rvert}{\lvert C(R_x) \cup C(R_y) \rvert }$$ +where $C(R_x)$ represents the unique chunk set of all training set images in the image. Calculate the number of images based on $distance(x,y) \leq \gamma$,If there are M y, such that $distance(x,y) \leq \gamma$, where $M \geq MinPts$, then add the imagex to the core point set, and image y is called the image in the neighborhood of the core image x; + +3.3 Initialize the number of cluster classes k=0, and then iterate the core point warehouse collection in turn, and add all the neighboring warehouses in the core point warehouse to the queue, if a warehouse in the neighborhood is also a core warehouse, all warehouses in its neighborhood join the queue, classify the warehouses in the above queue into a cluster class, and continue to traverse the core warehouse collection until all core warehouses are traversed. + +3.4 Calculate the frequency of chunks that appear in each class image. Add the chunk that appears in the image above $90%$ in the training set to the dictionary corresponding to the cluster class to generate a set of < cluster classes, and the dictionary > pairs. +*** +**4.** Adjust the neighborhood radius size and repeat step 3 to obtain multiple deduplication dictionaries. + +**5.** Use the test set to evaluate multiple deduplication dictionaries in 4, and select the chunk dictionary corresponding to the test set with the smallest storage space. + +**6.** Remove the chunk in the chunk dictionary selected in 5 for all images (training set and test set), and then repeat the operation 1-5 to generate the chunk dictionary until the maximum number of cycles is reached 7, or the discrete image ratio is greater than 80% of the total number of images. + +The principle of DBSCAN algorithm how to divide the cluster is shown in the diagram: +![dbscan algorithm](images/nydus_chunkdict_dbscan_algorithm.png) +**Remark:** This section of the picture and the associated DBSCAN algorithm description are referenced from : [https://en.wikipedia.org/wiki/DBSCAN](https://en.wikipedia.org/wiki/DBSCAN) + +#### Algorithm 2 Deduplication between different versions of the image (exponential smoothing algorithm) + +*** +**Basic principle:** Exponential smoothing algorithm is a method for time series data prediction and smoothing, the basic principle is to weighted average the data, give higher weight to the more recent repeated chunks, and constantly update the smoothing value, so the newer chunk has a greater impact on future forecasts, and the impact of older data will gradually weaken. + +**Input:** The training set and test set after deduplication in algorithm 1. + +**Output:** The chunk dictionary corresponding to each image. + +**Basic steps:** +**1.** Divide all chunks in the training set into a new list according to the image_name, and each list corresponds to an image and all chunk sets in the image. + +**2.** The different versions inside each image are sorted chronologically, and each chunk is scored according to the Exponential Smoothing formula. +$$S_0 =0 ,S_t = \alpha Y_{t-1} +(1- \alpha)S_{t-1} $$ +where, $\alpha=0.5$ , $Y_{t-1}$ indicates whether the chunk appeared in the previous image, 1 if it did, otherwise 0. + +**3.** Count the score for each chunk and select all chunks with a score greater than $THs$ as the chunk dictionary. Deduplicate the image version in the test set and calculate the storage space it occupies. + +**4.** Modify the value of $THs$ from 0.8 to 0.5 in steps of 0.05 and repeat steps 2 and 3 to generate multiple chunk dictionaries. + +**5.** Choose a chunk dictionary that minimizes the test set's storage space. +*** + +### Exponential Smoothing Algorithm Test + +#### Procedure + +**1.** Download 10 versions of each OCI image and count the total size in MB. +**2.** Convert the OCI images to Nydus format and then count the total size in MB after conversion. +**3.** Select three versions of each image to generate a chunk dictionary. Use the chunk dictionary to convert the remaining seven versions of the image, and then count the total size in MB after deduplication. + +#### Image Information Table + +| **Image Name** | **Number of Versions** | **Total Image Size (OCI)** | **Total Image Size (Nydus)** | +| :------------: | :--------------------: | :------------------------: | :--------------------------: | +| **Redis** | 10 | 341.78 MB | 419.37 MB | +| **Ubuntu** | 10 | 290.26 MB | 308.59 MB | +| **Alpine** | 10 | 26.9 MB | 27.55 MB | + +#### Deduplication Results Table + +| **Image Name** | **Chunkdict Image Size** | **Total Image Size (Nydus after Deduplicating)** | **Deduplicating Rate** | +| :------------: | :----------------------: | :----------------------------------------------: | :--------------------: | +| **Redis** | 41.87 MB | 319.48 MB | 23.82% | +| **Ubuntu** | 30.8 MB | 140.28 MB | 54.54% | +| **Alpine** | 2.74 MB | 24.7 MB | 10.34% | + +*** diff --git a/docs/images/nydus_chunkdict_dbscan_algorithm.png b/docs/images/nydus_chunkdict_dbscan_algorithm.png new file mode 100644 index 00000000000..236f5c8bd3a Binary files /dev/null and b/docs/images/nydus_chunkdict_dbscan_algorithm.png differ diff --git a/rafs/src/metadata/cached_v5.rs b/rafs/src/metadata/cached_v5.rs index d6ba0b02742..61e4dd1d1b4 100644 --- a/rafs/src/metadata/cached_v5.rs +++ b/rafs/src/metadata/cached_v5.rs @@ -994,6 +994,7 @@ mod cached_tests { 0, BlobFeatures::_V5_NO_EXT_BLOB_TABLE, meta.flags, + false, ); let mut cached_inode = CachedInodeV5::new(blob_table, meta.clone()); cached_inode.load(&meta, &mut reader).unwrap(); diff --git a/rafs/src/metadata/layout/v5.rs b/rafs/src/metadata/layout/v5.rs index a1b8db1ea63..52a4c21a358 100644 --- a/rafs/src/metadata/layout/v5.rs +++ b/rafs/src/metadata/layout/v5.rs @@ -563,6 +563,7 @@ impl RafsV5BlobTable { compressed_size: u64, blob_features: BlobFeatures, flags: RafsSuperFlags, + is_chunkdict: bool, ) -> u32 { let blob_index = self.entries.len() as u32; let mut blob_info = BlobInfo::new( @@ -578,6 +579,9 @@ impl RafsV5BlobTable { blob_info.set_compressor(flags.into()); blob_info.set_digester(flags.into()); blob_info.set_prefetch_info(prefetch_offset as u64, prefetch_size as u64); + if is_chunkdict { + blob_info.set_chunkdict_generated(true); + } self.entries.push(Arc::new(blob_info)); self.extended.add( diff --git a/rafs/src/metadata/layout/v6.rs b/rafs/src/metadata/layout/v6.rs index 5099e4a722c..6a64607fb07 100644 --- a/rafs/src/metadata/layout/v6.rs +++ b/rafs/src/metadata/layout/v6.rs @@ -1754,7 +1754,8 @@ impl RafsV6Blob { blob_features.bits() ); return false; - } else if !tarfs_mode + } else if !blob_features.contains(BlobFeatures::IS_CHUNKDICT_GENERATED) + && !tarfs_mode && ci_uncompr_size != count * size_of::() as u64 { error!( @@ -1819,6 +1820,7 @@ impl RafsV6BlobTable { blob_toc_digest: [u8; 32], blob_meta_size: u64, blob_toc_size: u32, + is_chunkdict: bool, header: BlobCompressionContextHeader, cipher_object: Arc, cipher_context: Option, @@ -1851,6 +1853,8 @@ impl RafsV6BlobTable { blob_info.set_blob_toc_size(blob_toc_size); blob_info.set_cipher_info(flags.into(), cipher_object, cipher_context); + blob_info.set_chunkdict_generated(is_chunkdict); + self.entries.push(Arc::new(blob_info)); blob_index @@ -2725,6 +2729,7 @@ mod tests { [0; 32], 0, 0, + false, BlobCompressionContextHeader::default(), Arc::new(crypt::Algorithm::Aes128Xts.new_cipher().unwrap()), Some(CipherContext::default()), @@ -2767,6 +2772,7 @@ mod tests { [0; 32], 0, 0, + false, BlobCompressionContextHeader::default(), Arc::new(crypt::Algorithm::Aes128Xts.new_cipher().unwrap()), Some(CipherContext::default()), diff --git a/smoke/tests/image_test.go b/smoke/tests/image_test.go index d8f06bc41ef..e4f27dc6765 100644 --- a/smoke/tests/image_test.go +++ b/smoke/tests/image_test.go @@ -7,6 +7,7 @@ package tests import ( "fmt" "path/filepath" + "strings" "testing" "github.com/dragonflyoss/nydus/smoke/tests/tool" @@ -127,7 +128,8 @@ func (i *ImageTestSuite) TestConvertAndCopyImage(t *testing.T, ctx tool.Context, // Copy image targetCopied := fmt.Sprintf("%s_copied", target) copyCmd := fmt.Sprintf( - "%s %s copy --source %s --target %s --nydus-image %s --work-dir %s --push-chunk-size 1MB", + // "%s %s copy --source %s --target %s --nydus-image %s --work-dir %s --push-chunk-size 1MB", + "%s %s copy --source %s --target %s --nydus-image %s --work-dir %s", ctx.Binary.Nydusify, logLevel, target, targetCopied, ctx.Binary.Builder, ctx.Env.WorkDir, ) tool.RunWithoutOutput(t, copyCmd) @@ -140,6 +142,86 @@ func (i *ImageTestSuite) TestConvertAndCopyImage(t *testing.T, ctx tool.Context, tool.RunWithoutOutput(t, checkCmd) } +func (i *ImageTestSuite) TestGenerateChunkdicts() test.Generator { + images := []string{"redis:7.0.1", "redis:7.0.2", "redis:7.0.3"} + var sources []string + for _, image := range images { + image = i.prepareImage(i.T, image) + sources = append(sources, image) + } + scenarios := tool.DescartesIterator{} + scenarios. + Dimension(paramFSVersion, []interface{}{"5", "6"}) + return func() (name string, testCase test.Case) { + if !scenarios.HasNext() { + return + } + scenario := scenarios.Next() + ctx := tool.DefaultContext(i.T) + ctx.Build.FSVersion = scenario.GetString(paramFSVersion) + return "chunkdict:" + scenario.Str(), func(t *testing.T) { + i.TestChundict(t, *ctx, sources) + } + } +} + +func (i *ImageTestSuite) TestChundict(t *testing.T, ctx tool.Context, images []string) { + trainImage := images[:len(images)-1] + testImage := images[len(images)-1] + + ctx.PrepareWorkDir(t) + defer ctx.Destroy(t) + + // Prepare options. + enableOCIRef := "" + enableEncrypt := "" + fsVersion := fmt.Sprintf("--fs-version %s", ctx.Build.FSVersion) + logLevel := "--log-level warn" + compressor := "--compressor lz4_block" + + // Prepare nydus images. + var targets []string + for _, image := range trainImage { + target := fmt.Sprintf("%s-nydus-%s", image, uuid.NewString()) + targets = append(targets, target) + + fmt.Println("target:", target) + convertCmd := fmt.Sprintf( + "%s %s convert --source %s --target %s %s %s %s %s --nydus-image %s --work-dir %s %s", + ctx.Binary.Nydusify, logLevel, image, target, fsVersion, enableOCIRef, "", enableEncrypt, ctx.Binary.Builder, ctx.Env.WorkDir, compressor, + ) + tool.RunWithoutOutput(t, convertCmd) + } + targetsStr := strings.Join(targets, ",") + + // Generate chunkdict. + chunkdict := fmt.Sprintf("%s/redis:nydus-chunkdict-%s", strings.SplitN(testImage, "/", 2)[0], uuid.NewString()) + fmt.Println("chunkdict:", chunkdict) + generateCmd := fmt.Sprintf( + "%s %s chunkdict generate --sources %s --target %s --source-insecure --target-insecure --nydus-image %s --work-dir %s", + ctx.Binary.Nydusify, logLevel, targetsStr, chunkdict, ctx.Binary.Builder, filepath.Join(ctx.Env.WorkDir, "generate"), + ) + tool.RunWithoutOutput(t, generateCmd) + fmt.Println("generateCmd:", generateCmd) + + // Covert test image by chunkdict. + target := fmt.Sprintf("%s-nydus-%s", testImage, uuid.NewString()) + convertCmd := fmt.Sprintf( + "%s %s convert --source %s --target %s %s %s %s %s --nydus-image %s --work-dir %s %s", + ctx.Binary.Nydusify, logLevel, testImage, target, fsVersion, enableOCIRef, "", enableEncrypt, ctx.Binary.Builder, ctx.Env.WorkDir, compressor, + ) + tool.RunWithoutOutput(t, convertCmd) + + // Check nydus image covert by chunkdict. + checkCmd := fmt.Sprintf( + "%s %s check --target %s --nydus-image %s --nydusd %s --work-dir %s", + ctx.Binary.Nydusify, logLevel, target, ctx.Binary.Builder, ctx.Binary.Nydusd, filepath.Join(ctx.Env.WorkDir, "check"), + ) + tool.RunWithoutOutput(t, checkCmd) + fmt.Println("checkCmd:", checkCmd) + ctx.Destroy(t) +} + func (i *ImageTestSuite) prepareImage(t *testing.T, image string) string { if i.preparedImages == nil { i.preparedImages = make(map[string]string) diff --git a/src/bin/nydus-image/deduplicate.rs b/src/bin/nydus-image/deduplicate.rs index 83de9188940..c28130e023f 100644 --- a/src/bin/nydus-image/deduplicate.rs +++ b/src/bin/nydus-image/deduplicate.rs @@ -4,13 +4,21 @@ //! Deduplicate for Chunk. use anyhow::{Context, Result}; +use core::cmp::Ordering; use nydus_api::ConfigV2; +use nydus_builder::BuildContext; +use nydus_builder::ConversionType; use nydus_builder::Tree; -use nydus_rafs::metadata::RafsSuper; +use nydus_builder::{ChunkdictBlobInfo, ChunkdictChunkInfo}; +use nydus_rafs::metadata::{RafsSuper, RafsVersion}; use nydus_storage::device::BlobInfo; use rusqlite::{params, Connection}; +use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryFrom; use std::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::result::Result::Ok; use std::sync::{Arc, Mutex}; #[derive(Debug)] @@ -18,7 +26,7 @@ pub enum DatabaseError { SqliteError(rusqlite::Error), PoisonError(String), // Add other database error variants here as needed, e.g.: - // MysqlError(mysql::Error), + // MysqlError(mysql::Error). } impl std::fmt::Display for DatabaseError { @@ -26,7 +34,7 @@ impl std::fmt::Display for DatabaseError { match *self { DatabaseError::SqliteError(ref err) => err.fmt(f), DatabaseError::PoisonError(ref err) => write!(f, "PoisonError: {}", err), - // Add other error type formatting here + // Add other error type formatting here. } } } @@ -47,16 +55,22 @@ pub trait Database { fn create_blob_table(&self) -> Result<()>; /// Inserts chunk information into the database. - fn insert_chunk(&self, chunk_info: &Chunk) -> Result<()>; + fn insert_chunk(&self, chunk_info: &ChunkdictChunkInfo) -> Result<()>; /// Inserts blob information into the database. - fn insert_blob(&self, blob_info: &Blob) -> Result<()>; + fn insert_blob(&self, blob_info: &ChunkdictBlobInfo) -> Result<()>; /// Retrieves all chunk information from the database. - fn get_chunks(&self) -> Result>; + fn get_chunks(&self) -> Result>; + + /// Retrieves all chunk information from the database filtered by blob ID. + fn get_chunks_by_blob_id(&self, blob_id: &str) -> Result>; /// Retrieves all blob information from the database. - fn get_blobs(&self) -> Result>; + fn get_blobs(&self) -> Result>; + + /// Retrieves blob information from the database filtered by blob ID. + fn get_blob_by_id(&self, blob_id: &str) -> Result; } pub struct SqliteDatabase { @@ -66,15 +80,11 @@ pub struct SqliteDatabase { impl SqliteDatabase { pub fn new(database_url: &str) -> Result { - // Delete the database file if it exists. + // Connect to a database that already exists. if let Ok(metadata) = fs::metadata(database_url) { if metadata.is_file() { - if let Err(err) = fs::remove_file(database_url) { - warn!( - "Warning: Unable to delete existing database file: {:?}.", - err - ); - } + } else { + panic!("Warning: Unable to find existing database file."); } } @@ -106,25 +116,93 @@ impl Database for SqliteDatabase { BlobTable::create(&self.blob_table).context("Failed to create blob table") } - fn insert_chunk(&self, chunk: &Chunk) -> Result<()> { + fn insert_chunk(&self, chunk: &ChunkdictChunkInfo) -> Result<()> { self.chunk_table .insert(chunk) .context("Failed to insert chunk") } - fn insert_blob(&self, blob: &Blob) -> Result<()> { + fn insert_blob(&self, blob: &ChunkdictBlobInfo) -> Result<()> { self.blob_table .insert(blob) .context("Failed to insert blob") } - fn get_chunks(&self) -> Result> { + fn get_chunks(&self) -> Result> { ChunkTable::list_all(&self.chunk_table).context("Failed to get chunks") } - fn get_blobs(&self) -> Result> { + fn get_chunks_by_blob_id(&self, blob_id: &str) -> Result> { + ChunkTable::list_all_by_blob_id(&self.chunk_table, blob_id).context("Failed to get chunks") + } + + fn get_blobs(&self) -> Result> { BlobTable::list_all(&self.blob_table).context("Failed to get blobs") } + + fn get_blob_by_id(&self, blob_id: &str) -> Result { + BlobTable::list_by_id(&self.blob_table, blob_id).context("Failed to get blob") + } +} + +/// Get fs version from bootstrap file. +fn get_fs_version(bootstrap_path: &Path) -> Result { + let (sb, _) = RafsSuper::load_from_file(bootstrap_path, Arc::new(ConfigV2::default()), false)?; + RafsVersion::try_from(sb.meta.version).context("Failed to get RAFS version number") +} + +/// Checks if all Bootstrap versions are consistent. +/// If they are inconsistent, returns an error and prints the version of each Bootstrap. +pub fn check_bootstrap_versions_consistency( + ctx: &mut BuildContext, + bootstrap_paths: &[PathBuf], +) -> Result<()> { + let mut versions = Vec::new(); + + for bootstrap_path in bootstrap_paths { + let version = get_fs_version(bootstrap_path)?; + versions.push((bootstrap_path.clone(), version)); + } + + if !versions.is_empty() { + let first_version = versions[0].1; + ctx.fs_version = first_version; + if versions.iter().any(|(_, v)| *v != first_version) { + for (path, version) in &versions { + println!("Bootstrap path {:?} has version {:?}", path, version); + } + return Err(anyhow!( + "Bootstrap versions are inconsistent, cannot use chunkdict." + )); + } + } + + Ok(()) +} + +// Get parent bootstrap context for chunkdict bootstrap. +pub fn update_ctx_from_parent_bootstrap( + ctx: &mut BuildContext, + bootstrap_path: &PathBuf, +) -> Result<()> { + let (sb, _) = RafsSuper::load_from_file(bootstrap_path, Arc::new(ConfigV2::default()), false)?; + + // Obtain the features of the first blob to use as the features for the blobs in chunkdict. + if let Some(first_blob) = sb.superblock.get_blob_infos().first() { + ctx.blob_features = first_blob.features(); + } + + let config = sb.meta.get_config(); + config.check_compatibility(&sb.meta)?; + + if config.is_tarfs_mode { + ctx.conversion_type = ConversionType::TarToTarfs; + } + ctx.fs_version = + RafsVersion::try_from(sb.meta.version).context("Failed to get RAFS version")?; + ctx.compressor = config.compressor; + + Ok(()) } pub struct Deduplicate { @@ -147,12 +225,14 @@ impl Deduplicate { &mut self, bootstrap_path: &Path, config: Arc, + image_reference: String, + version: String, ) -> anyhow::Result>> { let (sb, _) = RafsSuper::load_from_file(bootstrap_path, config, false)?; self.create_tables()?; let blob_infos = sb.superblock.get_blob_infos(); self.insert_blobs(&blob_infos)?; - self.insert_chunks(&blob_infos, &sb)?; + self.insert_chunks(&blob_infos, &sb, image_reference, version)?; Ok(blob_infos) } @@ -169,10 +249,14 @@ impl Deduplicate { fn insert_blobs(&mut self, blob_infos: &[Arc]) -> anyhow::Result<()> { for blob in blob_infos { self.db - .insert_blob(&Blob { + .insert_blob(&ChunkdictBlobInfo { blob_id: blob.blob_id().to_string(), blob_compressed_size: blob.compressed_size(), blob_uncompressed_size: blob.uncompressed_size(), + blob_compressor: blob.compressor().to_string(), + blob_meta_ci_compressed_size: blob.meta_ci_compressed_size(), + blob_meta_ci_uncompressed_size: blob.meta_ci_uncompressed_size(), + blob_meta_ci_offset: blob.meta_ci_offset(), }) .context("Failed to insert blob")?; } @@ -183,6 +267,8 @@ impl Deduplicate { &mut self, blob_infos: &[Arc], sb: &RafsSuper, + image_reference: String, + version: String, ) -> anyhow::Result<()> { let process_chunk = &mut |t: &Tree| -> Result<()> { let node = t.lock_node(); @@ -190,7 +276,9 @@ impl Deduplicate { let index = chunk.inner.blob_index(); let chunk_blob_id = blob_infos[index as usize].blob_id(); self.db - .insert_chunk(&Chunk { + .insert_chunk(&ChunkdictChunkInfo { + image_reference: image_reference.to_string(), + version: version.to_string(), chunk_blob_id, chunk_digest: chunk.inner.id().to_string(), chunk_compressed_size: chunk.inner.compressed_size(), @@ -209,27 +297,630 @@ impl Deduplicate { } } +pub struct Algorithm { + algorithm_name: String, + db: D, +} + +// Generate deduplicated chunkdict by exponential_smoothing algorithm. +type VersionMap = HashMap>; +// Generate deduplicated chunkdict by cluster algorithm. +type ImageMap = Vec, Vec>>; + +impl Algorithm { + pub fn new(algorithm: String, db_url: &str) -> anyhow::Result { + let algorithm_name = algorithm; + let db = SqliteDatabase::new(db_url)?; + Ok(Self { algorithm_name, db }) + } + + // Call the algorithm to generate a dictionary. + pub fn chunkdict_generate( + &mut self, + ) -> anyhow::Result<(Vec, Vec, Vec)> { + let all_chunks: Vec = self.db.chunk_table.list_all()?; + let mut chunkdict_chunks: Vec = Vec::new(); + let mut chunkdict_blobs: Vec = Vec::new(); + let mut core_image = Vec::new(); + let mut noise_points = Vec::new(); + + let (chunkdict_version, chunkdict_image) = match &self.algorithm_name as &str { + "exponential_smoothing" => Self::deduplicate_version(&all_chunks)?, + _ => { + bail!("Unsupported algorithm name:, please use a valid algorithm name, such as exponential_smoothing") + } + }; + for single_clustering in chunkdict_image { + for (image_list, cluster_dictionary) in single_clustering { + core_image.extend(image_list); + chunkdict_chunks.extend(cluster_dictionary); + } + } + for (_, dictionary) in chunkdict_version { + chunkdict_chunks.extend(dictionary); + } + let mut chunkdict_size = 0; + for i in &chunkdict_chunks { + chunkdict_size += i.chunk_compressed_size; + } + info!( + "Chunkdict size is {}", + chunkdict_size as f64 / 1024 as f64 / 1024 as f64 + ); + for chunk in all_chunks { + if !core_image.contains(&chunk.image_reference) + && !noise_points.contains(&chunk.image_reference) + { + noise_points.push(chunk.image_reference.clone()); + } + } + Self::fill_chunkdict(self, &mut chunkdict_chunks, &mut chunkdict_blobs)?; + Ok((chunkdict_chunks, chunkdict_blobs, noise_points)) + } + + /// Baseed chunk list to fill chunkdict, including all chunks in the same blob and all blobs in the chunkdict. + fn fill_chunkdict( + &mut self, + chunkdict_chunks: &mut Vec, + chunkdict_blobs: &mut Vec, + ) -> Result<()> { + let mut blob_ids = std::collections::HashSet::new(); + for chunk in chunkdict_chunks.iter() { + blob_ids.insert(chunk.chunk_blob_id.clone()); + } + for blob_id in blob_ids { + let mut chunks = self.db.get_chunks_by_blob_id(&blob_id)?; + chunks = chunks + .into_iter() + .collect::>() + .into_iter() + .collect::>(); + for chunk in chunks { + if !chunkdict_chunks.contains(&chunk) { + chunkdict_chunks.push(chunk); + } + } + chunkdict_blobs.push(self.db.get_blob_by_id(&blob_id)?); + } + Ok(()) + } + + // Algorithm "exponential_smoothing" + // List all chunk and sort them by the order in chunk table. + // Score each chunk by "exponential_smoothing" formula. + // Select chunks whose score is greater than threshold and generate chunk dictionary. + fn exponential_smoothing( + all_chunks: Vec, + threshold: f64, + ) -> anyhow::Result> { + let alpha = 0.5; + let mut smoothed_data = Vec::new(); + + let mut last_start_version_index = 0; + let mut start_version_index = 0; + let mut last_end_version_index = 0; + + for (chunk_index, chunk) in all_chunks.iter().enumerate() { + let mut is_duplicate: f64 = 0.0; + if chunk.version == all_chunks[0].version { + let smoothed_score: f64 = 0.0; + smoothed_data.push(smoothed_score); + } else { + if all_chunks[chunk_index - 1].version != all_chunks[chunk_index].version { + last_start_version_index = start_version_index; + start_version_index = chunk_index; + last_end_version_index = chunk_index - 1; + } + for last_chunk in all_chunks + .iter() + .take(last_end_version_index + 1) + .skip(last_start_version_index) + { + if chunk.chunk_digest == last_chunk.chunk_digest { + is_duplicate = 1.0; + break; + } + } + let smoothed_score: f64 = + alpha * is_duplicate + (1.0 - alpha) * smoothed_data[chunk_index - 1]; + smoothed_data.push(smoothed_score); + } + } + + let mut chunkdict: Vec = Vec::new(); + for i in 0..smoothed_data.len() { + let chunk = ChunkdictChunkInfo { + image_reference: all_chunks[i].image_reference.clone(), + version: all_chunks[i].version.clone(), + chunk_blob_id: all_chunks[i].chunk_blob_id.clone(), + chunk_digest: all_chunks[i].chunk_digest.clone(), + chunk_compressed_offset: all_chunks[i].chunk_compressed_offset, + chunk_uncompressed_offset: all_chunks[i].chunk_uncompressed_offset, + chunk_compressed_size: all_chunks[i].chunk_compressed_size, + chunk_uncompressed_size: all_chunks[i].chunk_uncompressed_size, + }; + if smoothed_data[i] > threshold { + chunkdict.push(chunk); + } + } + + // Deduplicate chunk dictionary. + let mut unique_chunks: BTreeMap = BTreeMap::new(); + for chunk in &chunkdict { + if !unique_chunks.contains_key(&chunk.chunk_digest) { + unique_chunks.insert(chunk.chunk_digest.clone(), chunk.clone()); + } + } + let unique_chunk_list: Vec = unique_chunks.values().cloned().collect(); + Ok(unique_chunk_list) + } + + /// Calculate the distance between two images. + fn distance( + image1: &[ChunkdictChunkInfo], + image2: &[ChunkdictChunkInfo], + ) -> anyhow::Result { + // The total size of all chunks in both images. + let mut image1_size: u64 = 0; + let mut image2_size: u64 = 0; + + for chunk1 in image1 { + image1_size += chunk1.chunk_compressed_size as u64; + } + for chunk2 in image2 { + image2_size += chunk2.chunk_compressed_size as u64; + } + + // The total size of the chunk repeated between two images. + let all_chunks: Vec<&ChunkdictChunkInfo> = image1.iter().chain(image2.iter()).collect(); + let mut compressed_size_map: std::collections::HashMap = + std::collections::HashMap::new(); + let mut processed_digests: HashSet<&String> = HashSet::new(); + + for chunk in all_chunks { + if processed_digests.contains(&chunk.chunk_digest) { + let size = compressed_size_map + .entry(chunk.chunk_digest.clone()) + .or_insert(0); + *size += chunk.chunk_compressed_size as u64; + } + processed_digests.insert(&chunk.chunk_digest); + } + + let repeat_size: u64 = compressed_size_map.values().cloned().sum(); + let distance: f64 = 1.0 - (repeat_size as f64 / ((image1_size + image2_size) as f64)); + Ok(distance) + } + + /// Divide the chunk list into sublists by image name. + fn divide_by_image(all_chunks: &[ChunkdictChunkInfo]) -> anyhow::Result> { + let mut image_chunks: std::collections::HashMap> = + std::collections::HashMap::new(); + let mut datadict: Vec = Vec::new(); + for chunk in all_chunks { + image_chunks + .entry(chunk.image_reference.clone()) + .or_insert(Vec::new()) + .push(chunk.clone()); + } + for (index, chunks) in image_chunks { + let data_point = DataPoint { + image_reference: index, + chunk_list: chunks, + visited: false, + clustered: false, + cluster_id: 0, + }; + datadict.push(data_point); + } + Ok(datadict) + } + + fn divide_set( + chunks: &[ChunkdictChunkInfo], + train_percentage: f64, + ) -> anyhow::Result<(Vec, Vec)> { + // Create a HashMap to store the list of chunks for each image_reference. + let mut image_chunks: BTreeMap> = BTreeMap::new(); + + // Group chunks into image_reference. + for chunk in chunks { + let entry = image_chunks + .entry(chunk.image_reference.clone()) + .or_insert(Vec::new()); + entry.push(chunk.clone()); + } + + // Create the final training and testing sets. + let mut train_set: Vec = Vec::new(); + let mut test_set: Vec = Vec::new(); + + // Iterate through the list of Chunks for each image_reference. + for (_, chunk_list) in image_chunks.iter_mut() { + let mut version_chunks: BTreeMap> = + BTreeMap::new(); + // Group the chunks in the image into version. + for chunk in chunk_list { + let entry = version_chunks + .entry(CustomString(chunk.version.clone())) + .or_insert(Vec::new()); + entry.push(chunk.clone()); + } + + let num_version_groups = version_chunks.len(); + let num_train_groups = (num_version_groups as f64 * train_percentage) as usize; + let version_groups = version_chunks.into_iter().collect::>(); + let (train_version_groups, test_version_groups) = + version_groups.split_at(num_train_groups); + + for (_, train_chunks) in train_version_groups { + for chunk in train_chunks { + train_set.push(chunk.clone()); + } + } + + for (_, test_chunks) in test_version_groups { + for chunk in test_chunks { + test_set.push(chunk.clone()); + } + } + } + Ok((train_set, test_set)) + } + + /// Dbscan clustering algorithm. + fn dbsacn(data_point: &mut Vec, radius: f64) -> anyhow::Result<&Vec> { + let min_points = 10; + let mut cluster_id = 1; + + for i in 0..data_point.len() { + if data_point[i].visited { + continue; + } + if data_point[i].clustered { + continue; + } + + let mut neighbors = Vec::new(); + for j in 0..data_point.len() { + let distance = + Self::distance(&data_point[i].chunk_list, &data_point[j].chunk_list)?; + if !data_point[j].visited && distance <= radius { + neighbors.push(j); + } + } + if neighbors.len() < min_points { + data_point[i].clustered = false; + } else { + Self::expand_cluster(data_point, i, cluster_id, radius, min_points)?; + cluster_id += 1; + } + } + Ok(data_point) + } + + /// Core point expansion cluster in dbscan algorithm. + fn expand_cluster( + data_point: &mut Vec, + i: usize, + cluster_id: i32, + radius: f64, + min_points: usize, + ) -> anyhow::Result<()> { + data_point[i].clustered = true; + data_point[i].cluster_id = cluster_id; + + let mut stack = vec![i]; + while let Some(q) = stack.pop() { + if data_point[q].visited { + continue; + } + data_point[q].visited = true; + let mut q_neighbors = Vec::new(); + for j in 0..data_point.len() { + let distance = + Self::distance(&data_point[q].chunk_list, &data_point[j].chunk_list)?; + if !data_point[j].visited && distance <= radius { + q_neighbors.push(j); + } + } + if q_neighbors.len() >= min_points { + for &r_index in &q_neighbors { + if !data_point[r_index].visited { + data_point[r_index].visited = true; + stack.push(r_index) + } + if !data_point[r_index].clustered { + data_point[r_index].clustered = true; + data_point[r_index].cluster_id = cluster_id; + } + } + } else { + data_point[i].clustered = false; + } + } + Ok(()) + } + + /// Aggregate the chunks in each cluster into a dictionary. + fn aggregate_chunk( + data_point: &[DataPoint], + ) -> anyhow::Result, Vec>> { + // Divide chunk list according to clusters. + let mut cluster_map: HashMap> = HashMap::new(); + for (index, point) in data_point.iter().enumerate() { + if point.clustered { + let cluster_id = point.cluster_id; + cluster_map + .entry(cluster_id) + .or_insert(Vec::new()) + .push(index); + } + } + + // Iterate through each cluster. + let mut dictionary: HashMap, Vec> = HashMap::new(); + for (_, cluster_points) in cluster_map.iter() { + let mut image_total_counts: HashMap<&str, usize> = HashMap::new(); + let mut image_list: Vec = Vec::new(); + // Count the total number of images in the cluster. + for &point_index in cluster_points { + let point = &data_point[point_index]; + let image_total_count = image_total_counts + .entry(&point.image_reference) + .or_insert(0); + *image_total_count += 1; + + image_list.push(point.image_reference.clone()); + } + + // Count the number of images in which chunks appear in the cluster. + let mut chunk_digest_counts: HashMap = HashMap::new(); + for &point_index in cluster_points { + let point = &data_point[point_index]; + let chunk_digest_set: HashSet = point + .chunk_list + .iter() + .map(|chunk| chunk.chunk_digest.clone()) + .collect(); + for chunk_digest in chunk_digest_set { + let count = chunk_digest_counts + .entry(chunk_digest.to_string()) + .or_insert(0); + *count += 1; + } + } + + let mut chunk_list: Vec = Vec::new(); + let mut added_chunk_digests: HashSet = HashSet::new(); + for &point_index in cluster_points { + let point = &data_point[point_index]; + for chunk in &point.chunk_list { + let chunk_digest = &chunk.chunk_digest; + if !added_chunk_digests.contains(chunk_digest) { + let count = chunk_digest_counts.get(chunk_digest).unwrap_or(&0); + if *count as f64 / image_total_counts.len() as f64 >= 0.9 { + chunk_list.push(chunk.clone()); + added_chunk_digests.insert(chunk_digest.to_string()); + } + } + } + } + dictionary.insert(image_list, chunk_list); + } + Ok(dictionary) + } + + fn deduplicate_image( + all_chunks: Vec, + ) -> anyhow::Result, Vec>>> { + let train_percentage = 0.7; + let max_cluster_count = 7; + let mut counter = 0; + let all_chunks_clone = all_chunks; + let mut data_dict: Vec, Vec>> = Vec::new(); + + let (mut train, mut test) = Self::divide_set(&all_chunks_clone, train_percentage)?; + while counter < max_cluster_count { + // Parameter settings. + let mut data_point = Self::divide_by_image(&train)?; + let all_train_length = data_point.len(); + let mut radius = 0.5; + let max_radius = 0.9; + let mut test_chunk_sizes = Vec::new(); + let mut min_test_size: u64 = std::u64::MAX; + let mut min_data_dict = HashMap::new(); + let mut data_cluster_length = 0; + + // Adjust the radius size to select the dictionary that tests best. + while radius <= max_radius { + let data_cluster = Self::dbsacn(&mut data_point, radius)?; + data_cluster_length = data_cluster.len(); + + let data_dict = Self::aggregate_chunk(data_cluster)?; + + let all_chunks: HashSet<&ChunkdictChunkInfo> = + data_dict.values().flat_map(|v| v.iter()).collect(); + let mut total_test_set_size: u64 = 0; + + for chunk in test.iter() { + if !all_chunks.contains(chunk) { + total_test_set_size += chunk.chunk_compressed_size as u64; + } + } + test_chunk_sizes.push((radius, total_test_set_size)); + min_test_size = total_test_set_size; + if total_test_set_size <= min_test_size { + min_test_size = total_test_set_size; + min_data_dict = data_dict; + } + radius += 0.05; + } + debug!("test set size is {}", min_test_size); + + let min_chunk_list: Vec = min_data_dict + .values() + .flat_map(|chunk_list| chunk_list.iter()) + .cloned() + .collect(); + let mut to_remove = Vec::new(); + for chunk in train.iter() { + if min_chunk_list.contains(chunk) { + to_remove.push(chunk.clone()); + } + } + for chunk in &to_remove { + train.retain(|c| c.chunk_digest != chunk.chunk_digest); + } + for chunk in &to_remove { + test.retain(|c| c.chunk_digest != chunk.chunk_digest); + } + if (data_cluster_length as f64 / all_train_length as f64) < 0.2 { + break; + } + data_dict.push(min_data_dict); + counter += 1; + } + Ok(data_dict) + } + + pub fn deduplicate_version( + all_chunks: &[ChunkdictChunkInfo], + ) -> anyhow::Result<(VersionMap, ImageMap)> { + let mut all_chunks_size = 0; + for i in all_chunks { + all_chunks_size += i.chunk_compressed_size; + } + info!( + "All chunk size is {}", + all_chunks_size as f64 / 1024 as f64 / 1024 as f64 + ); + + let train_percentage = 0.7; + let datadict = Self::deduplicate_image(all_chunks.to_owned())?; + let (train, test) = Self::divide_set(all_chunks, train_percentage)?; + let mut train_set_size = 0; + for i in &train { + train_set_size += i.chunk_compressed_size; + } + info!( + "Train set size is {}", + train_set_size as f64 / 1024 as f64 / 1024 as f64 + ); + + let mut test_set_size = 0; + for i in &test { + test_set_size += i.chunk_compressed_size; + } + info!( + "Test set size is {}", + test_set_size as f64 / 1024 as f64 / 1024 as f64 + ); + + let mut version_datadict: HashMap> = HashMap::new(); + let mut data_point = Self::divide_by_image(&train)?; + + let mut threshold = 0.5; + let max_threshold = 0.8; + + let mut test_total_size: u32 = 0; + let mut min_test_size: u32 = std::u32::MAX; + let mut min_data_dict = HashMap::new(); + + while threshold <= max_threshold { + version_datadict.clear(); + for point in data_point.iter_mut() { + for single_dictionary in &datadict { + for (key, value) in single_dictionary.iter() { + if key.contains(&point.image_reference) { + let mut to_remove = Vec::new(); + for chunk in point.chunk_list.iter() { + if value.contains(chunk) { + to_remove.push(chunk.clone()); + } + } + for chunk in to_remove { + point.chunk_list.retain(|c| c != &chunk); + } + } + } + } + let chunk_dict = Self::exponential_smoothing(point.chunk_list.clone(), threshold)?; + version_datadict.insert(point.image_reference.clone(), chunk_dict); + } + + let mut test_by_image = Self::divide_by_image(&test)?; + for point in test_by_image.iter_mut() { + if version_datadict.contains_key(&point.image_reference.clone()) { + let mut to_remove = Vec::new(); + let mut vec_string = Vec::new(); + let chunkdict_option = version_datadict.get(&point.image_reference); + if let Some(chunkdict) = chunkdict_option { + for i in chunkdict { + vec_string.push(i.chunk_digest.clone()); + } + } + for chunk in point.chunk_list.iter() { + if vec_string.contains(&chunk.chunk_digest) { + to_remove.push(chunk.clone()); + } + } + for chunk in to_remove { + point.chunk_list.retain(|c| c != &chunk); + } + } + for chunk in point.chunk_list.iter() { + test_total_size = test_total_size + .checked_add(chunk.chunk_compressed_size) + .unwrap_or(test_total_size); + } + } + if test_total_size <= min_test_size { + min_test_size = test_total_size; + min_data_dict = version_datadict.clone(); + } + threshold += 0.05; + } + info!( + "After deduplicating test set size is {} and deduplicating rate is {} ", + min_test_size as f64 / 1024 as f64 / 1024 as f64, + 1.0 - (min_test_size as f64) / (test_set_size as f64) + ); + Ok((min_data_dict, datadict)) + } +} + +#[allow(dead_code)] +#[derive(Debug)] +struct DataPoint { + image_reference: String, + chunk_list: Vec, + visited: bool, + clustered: bool, + cluster_id: i32, +} + pub trait Table: Sync + Send + Sized + 'static where Err: std::error::Error + 'static, { - /// clear table. + /// Clear table. fn clear(&self) -> Result<(), Err>; - /// create table. + /// Create table. fn create(&self) -> Result<(), Err>; - /// insert data. + /// Insert data. fn insert(&self, table: &T) -> Result<(), Err>; - /// select all data. + /// Select all data. fn list_all(&self) -> Result, Err>; - /// select data with offset and limit. + /// Select data with offset and limit. fn list_paged(&self, offset: i64, limit: i64) -> Result, Err>; } -#[derive(Debug)] +#[derive()] pub struct ChunkTable { conn: Arc>, } @@ -248,19 +939,128 @@ impl ChunkTable { conn: Arc::new(Mutex::new(conn)), }) } + + /// Select all data filtered by blob ID. + fn list_all_by_blob_id(&self, blob_id: &str) -> Result, DatabaseError> { + let mut offset = 0; + let limit: i64 = 100; + let mut all_chunks_by_blob_id = Vec::new(); + + loop { + let chunks = self.list_paged_by_blob_id(blob_id, offset, limit)?; + if chunks.is_empty() { + break; + } + + all_chunks_by_blob_id.extend(chunks); + offset += limit; + } + + Ok(all_chunks_by_blob_id) + } + + /// Select data with offset and limit filtered by blob ID. + fn list_paged_by_blob_id( + &self, + blob_id: &str, + offset: i64, + limit: i64, + ) -> Result, DatabaseError> { + let conn_guard = self + .conn + .lock() + .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; + let mut stmt: rusqlite::Statement<'_> = conn_guard + .prepare( + "SELECT id, image_reference, version, chunk_blob_id, chunk_digest, chunk_compressed_size, + chunk_uncompressed_size, chunk_compressed_offset, chunk_uncompressed_offset from chunk + WHERE chunk_blob_id = ?1 + ORDER BY id LIMIT ?2 OFFSET ?3", + )?; + let chunk_iterator = stmt.query_map(params![blob_id, limit, offset], |row| { + Ok(ChunkdictChunkInfo { + image_reference: row.get(1)?, + version: row.get(2)?, + chunk_blob_id: row.get(3)?, + chunk_digest: row.get(4)?, + chunk_compressed_size: row.get(5)?, + chunk_uncompressed_size: row.get(6)?, + chunk_compressed_offset: row.get(7)?, + chunk_uncompressed_offset: row.get(8)?, + }) + })?; + let mut chunks = Vec::new(); + for chunk in chunk_iterator { + chunks.push(chunk.map_err(DatabaseError::SqliteError)?); + } + Ok(chunks) + } } -#[derive(Debug)] -pub struct Chunk { - chunk_blob_id: String, - chunk_digest: String, - chunk_compressed_size: u32, - chunk_uncompressed_size: u32, - chunk_compressed_offset: u64, - chunk_uncompressed_offset: u64, +#[derive(Debug, Clone)] +struct CustomString(String); + +impl Ord for CustomString { + /// Extract the numbers in the string. + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let mut current_number = String::new(); + + // Parse numbers in strings. + let mut numbers1 = Vec::new(); + let mut numbers2 = Vec::new(); + + for ch in self.0.chars() { + if ch.is_ascii_digit() { + current_number.push(ch); + } else if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers1.push(number); + } + current_number.clear(); + } + } + if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers1.push(number); + } + } + current_number.clear(); + + for ch in other.0.chars() { + if ch.is_ascii_digit() { + current_number.push(ch); + } else if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers2.push(number); + } + current_number.clear(); + } + } + if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers2.push(number); + } + } + current_number.clear(); + numbers1.cmp(&numbers2) + } +} + +impl PartialOrd for CustomString { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for CustomString { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } } -impl Table for ChunkTable { +impl Eq for CustomString {} + +impl Table for ChunkTable { fn clear(&self) -> Result<(), DatabaseError> { self.conn .lock() @@ -277,6 +1077,8 @@ impl Table for ChunkTable { .execute( "CREATE TABLE IF NOT EXISTS chunk ( id INTEGER PRIMARY KEY, + image_reference TEXT, + version TEXT, chunk_blob_id TEXT NOT NULL, chunk_digest TEXT, chunk_compressed_size INT, @@ -290,12 +1092,14 @@ impl Table for ChunkTable { Ok(()) } - fn insert(&self, chunk: &Chunk) -> Result<(), DatabaseError> { + fn insert(&self, chunk: &ChunkdictChunkInfo) -> Result<(), DatabaseError> { self.conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))? .execute( "INSERT INTO chunk( + image_reference, + version, chunk_blob_id, chunk_digest, chunk_compressed_size, @@ -303,9 +1107,11 @@ impl Table for ChunkTable { chunk_compressed_offset, chunk_uncompressed_offset ) - VALUES (?1, ?2, ?3, ?4, ?5, ?6); + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8); ", rusqlite::params![ + chunk.image_reference, + chunk.version, chunk.chunk_blob_id, chunk.chunk_digest, chunk.chunk_compressed_size, @@ -318,7 +1124,7 @@ impl Table for ChunkTable { Ok(()) } - fn list_all(&self) -> Result, DatabaseError> { + fn list_all(&self) -> Result, DatabaseError> { let mut offset = 0; let limit: i64 = 100; let mut all_chunks = Vec::new(); @@ -336,25 +1142,31 @@ impl Table for ChunkTable { Ok(all_chunks) } - fn list_paged(&self, offset: i64, limit: i64) -> Result, DatabaseError> { + fn list_paged( + &self, + offset: i64, + limit: i64, + ) -> Result, DatabaseError> { let conn_guard = self .conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; let mut stmt: rusqlite::Statement<'_> = conn_guard .prepare( - "SELECT id, chunk_blob_id, chunk_digest, chunk_compressed_size, + "SELECT id, image_reference, version, chunk_blob_id, chunk_digest, chunk_compressed_size, chunk_uncompressed_size, chunk_compressed_offset, chunk_uncompressed_offset from chunk ORDER BY id LIMIT ?1 OFFSET ?2", )?; let chunk_iterator = stmt.query_map(params![limit, offset], |row| { - Ok(Chunk { - chunk_blob_id: row.get(1)?, - chunk_digest: row.get(2)?, - chunk_compressed_size: row.get(3)?, - chunk_uncompressed_size: row.get(4)?, - chunk_compressed_offset: row.get(5)?, - chunk_uncompressed_offset: row.get(6)?, + Ok(ChunkdictChunkInfo { + image_reference: row.get(1)?, + version: row.get(2)?, + chunk_blob_id: row.get(3)?, + chunk_digest: row.get(4)?, + chunk_compressed_size: row.get(5)?, + chunk_uncompressed_size: row.get(6)?, + chunk_compressed_offset: row.get(7)?, + chunk_uncompressed_offset: row.get(8)?, }) })?; let mut chunks = Vec::new(); @@ -384,15 +1196,38 @@ impl BlobTable { conn: Arc::new(Mutex::new(conn)), }) } -} -pub struct Blob { - blob_id: String, - blob_compressed_size: u64, - blob_uncompressed_size: u64, + pub fn list_by_id(&self, blob_id: &str) -> Result { + let conn_guard = self + .conn + .lock() + .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; + let mut stmt = conn_guard.prepare( + "SELECT blob_id, blob_compressed_size, blob_uncompressed_size, blob_compressor, blob_meta_ci_compressed_size, blob_meta_ci_uncompressed_size, blob_meta_ci_offset FROM blob WHERE blob_id = ?1", + )?; + let mut blob_iterator = stmt.query_map([blob_id], |row| { + Ok(ChunkdictBlobInfo { + blob_id: row.get(0)?, + blob_compressed_size: row.get(1)?, + blob_uncompressed_size: row.get(2)?, + blob_compressor: row.get(3)?, + blob_meta_ci_compressed_size: row.get(4)?, + blob_meta_ci_uncompressed_size: row.get(5)?, + blob_meta_ci_offset: row.get(6)?, + }) + })?; + + if let Some(blob) = blob_iterator.next() { + blob.map_err(DatabaseError::SqliteError) + } else { + Err(DatabaseError::SqliteError( + rusqlite::Error::QueryReturnedNoRows, + )) + } + } } -impl Table for BlobTable { +impl Table for BlobTable { fn clear(&self) -> Result<(), DatabaseError> { self.conn .lock() @@ -408,10 +1243,14 @@ impl Table for BlobTable { .map_err(|e| DatabaseError::PoisonError(e.to_string()))? .execute( "CREATE TABLE IF NOT EXISTS blob ( - id INTEGER PRIMARY KEY, - blob_id TEXT NOT NULL, - blob_compressed_size INT, - blob_uncompressed_size INT + id INTEGER PRIMARY KEY, + blob_id TEXT NOT NULL, + blob_compressed_size INT, + blob_uncompressed_size INT, + blob_compressor TEXT, + blob_meta_ci_compressed_size INT, + blob_meta_ci_uncompressed_size INT, + blob_meta_ci_offset INT )", [], ) @@ -419,7 +1258,7 @@ impl Table for BlobTable { Ok(()) } - fn insert(&self, blob: &Blob) -> Result<(), DatabaseError> { + fn insert(&self, blob: &ChunkdictBlobInfo) -> Result<(), DatabaseError> { self.conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))? @@ -427,21 +1266,29 @@ impl Table for BlobTable { "INSERT INTO blob ( blob_id, blob_compressed_size, - blob_uncompressed_size + blob_uncompressed_size, + blob_compressor, + blob_meta_ci_compressed_size, + blob_meta_ci_uncompressed_size, + blob_meta_ci_offset ) - VALUES (?1, ?2, ?3); + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7); ", rusqlite::params![ blob.blob_id, blob.blob_compressed_size, - blob.blob_uncompressed_size + blob.blob_uncompressed_size, + blob.blob_compressor, + blob.blob_meta_ci_compressed_size, + blob.blob_meta_ci_uncompressed_size, + blob.blob_meta_ci_offset, ], ) .map_err(DatabaseError::SqliteError)?; Ok(()) } - fn list_all(&self) -> Result, DatabaseError> { + fn list_all(&self) -> Result, DatabaseError> { let mut offset = 0; let limit: i64 = 100; let mut all_blobs = Vec::new(); @@ -459,20 +1306,24 @@ impl Table for BlobTable { Ok(all_blobs) } - fn list_paged(&self, offset: i64, limit: i64) -> Result, DatabaseError> { + fn list_paged(&self, offset: i64, limit: i64) -> Result, DatabaseError> { let conn_guard = self .conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; let mut stmt: rusqlite::Statement<'_> = conn_guard.prepare( - "SELECT blob_id, blob_compressed_size, blob_uncompressed_size from blob + "SELECT blob_id, blob_compressed_size, blob_uncompressed_size, blob_compressor, blob_meta_ci_compressed_size, blob_meta_ci_uncompressed_size, blob_meta_ci_offset from blob ORDER BY id LIMIT ?1 OFFSET ?2", )?; let blob_iterator = stmt.query_map(params![limit, offset], |row| { - Ok(Blob { + Ok(ChunkdictBlobInfo { blob_id: row.get(0)?, blob_compressed_size: row.get(1)?, blob_uncompressed_size: row.get(2)?, + blob_compressor: row.get(3)?, + blob_meta_ci_compressed_size: row.get(4)?, + blob_meta_ci_uncompressed_size: row.get(5)?, + blob_meta_ci_offset: row.get(6)?, }) })?; let mut blobs = Vec::new(); @@ -488,14 +1339,45 @@ mod tests { use super::*; use rusqlite::Result; + #[test] + fn test_partial_cmp() -> Result<(), Box> { + let custom_string1 = CustomString("nydus_1.2.3".to_string()); + let custom_string2 = CustomString("nydus_1.2.10".to_string()); + let custom_string3 = CustomString("nydus_2.0".to_string()); + + assert!(custom_string1 < custom_string2); + assert!(custom_string2 < custom_string3); + assert!(custom_string1 < custom_string3); + + assert!(custom_string1 <= custom_string2); + assert!(custom_string2 <= custom_string3); + assert!(custom_string1 <= custom_string3); + + assert!(custom_string2 > custom_string1); + assert!(custom_string3 > custom_string2); + assert!(custom_string3 > custom_string1); + + assert!(custom_string2 >= custom_string1); + assert!(custom_string3 >= custom_string2); + assert!(custom_string3 >= custom_string1); + + assert_eq!(custom_string1, CustomString("nydus_1.2.3".to_string())); + assert_ne!(custom_string1, custom_string2); + Ok(()) + } + #[test] fn test_blob_table() -> Result<(), Box> { let blob_table = BlobTable::new_in_memory()?; blob_table.create()?; - let blob = Blob { + let blob = ChunkdictBlobInfo { blob_id: "BLOB123".to_string(), blob_compressed_size: 1024, blob_uncompressed_size: 2048, + blob_compressor: "zstd".to_string(), + blob_meta_ci_compressed_size: 1024, + blob_meta_ci_uncompressed_size: 2048, + blob_meta_ci_offset: 0, }; blob_table.insert(&blob)?; let blobs = blob_table.list_all()?; @@ -503,6 +1385,16 @@ mod tests { assert_eq!(blobs[0].blob_id, blob.blob_id); assert_eq!(blobs[0].blob_compressed_size, blob.blob_compressed_size); assert_eq!(blobs[0].blob_uncompressed_size, blob.blob_uncompressed_size); + assert_eq!(blobs[0].blob_compressor, blob.blob_compressor); + assert_eq!( + blobs[0].blob_meta_ci_compressed_size, + blob.blob_meta_ci_compressed_size + ); + assert_eq!( + blobs[0].blob_meta_ci_uncompressed_size, + blob.blob_meta_ci_uncompressed_size + ); + assert_eq!(blobs[0].blob_meta_ci_offset, blob.blob_meta_ci_offset); Ok(()) } @@ -510,7 +1402,9 @@ mod tests { fn test_chunk_table() -> Result<(), Box> { let chunk_table = ChunkTable::new_in_memory()?; chunk_table.create()?; - let chunk = Chunk { + let chunk = ChunkdictChunkInfo { + image_reference: "REDIS".to_string(), + version: "1.0.0".to_string(), chunk_blob_id: "BLOB123".to_string(), chunk_digest: "DIGEST123".to_string(), chunk_compressed_size: 512, @@ -519,8 +1413,21 @@ mod tests { chunk_uncompressed_offset: 0, }; chunk_table.insert(&chunk)?; + let chunk2 = ChunkdictChunkInfo { + image_reference: "REDIS02".to_string(), + version: "1.0.0".to_string(), + chunk_blob_id: "BLOB456".to_string(), + chunk_digest: "DIGEST123".to_string(), + chunk_compressed_size: 512, + chunk_uncompressed_size: 1024, + chunk_compressed_offset: 0, + chunk_uncompressed_offset: 0, + }; + chunk_table.insert(&chunk2)?; let chunks = chunk_table.list_all()?; - assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].image_reference, chunk.image_reference); + assert_eq!(chunks[0].version, chunk.version); + assert_eq!(chunks.len(), 2); assert_eq!(chunks[0].chunk_blob_id, chunk.chunk_blob_id); assert_eq!(chunks[0].chunk_digest, chunk.chunk_digest); assert_eq!(chunks[0].chunk_compressed_size, chunk.chunk_compressed_size); @@ -536,6 +1443,11 @@ mod tests { chunks[0].chunk_uncompressed_offset, chunk.chunk_uncompressed_offset ); + + let chunks = chunk_table.list_all_by_blob_id(&chunk.chunk_blob_id)?; + assert_eq!(chunks[0].chunk_blob_id, chunk.chunk_blob_id); + assert_eq!(chunks.len(), 1); + Ok(()) } @@ -544,10 +1456,14 @@ mod tests { let blob_table = BlobTable::new_in_memory()?; blob_table.create()?; for i in 0..200 { - let blob = Blob { + let blob = ChunkdictBlobInfo { blob_id: format!("BLOB{}", i), blob_compressed_size: i, blob_uncompressed_size: i * 2, + blob_compressor: "zstd".to_string(), + blob_meta_ci_compressed_size: i, + blob_meta_ci_uncompressed_size: i * 2, + blob_meta_ci_offset: i * 3, }; blob_table.insert(&blob)?; } @@ -556,6 +1472,10 @@ mod tests { assert_eq!(blobs[0].blob_id, "BLOB100"); assert_eq!(blobs[0].blob_compressed_size, 100); assert_eq!(blobs[0].blob_uncompressed_size, 200); + assert_eq!(blobs[0].blob_compressor, "zstd"); + assert_eq!(blobs[0].blob_meta_ci_compressed_size, 100); + assert_eq!(blobs[0].blob_meta_ci_uncompressed_size, 200); + assert_eq!(blobs[0].blob_meta_ci_offset, 300); Ok(()) } @@ -565,7 +1485,9 @@ mod tests { chunk_table.create()?; for i in 0..200 { let i64 = i as u64; - let chunk = Chunk { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", i), chunk_blob_id: format!("BLOB{}", i), chunk_digest: format!("DIGEST{}", i), chunk_compressed_size: i, @@ -577,6 +1499,8 @@ mod tests { } let chunks = chunk_table.list_paged(100, 100)?; assert_eq!(chunks.len(), 100); + assert_eq!(chunks[0].image_reference, "REDIS100"); + assert_eq!(chunks[0].version, "1.0.0100"); assert_eq!(chunks[0].chunk_blob_id, "BLOB100"); assert_eq!(chunks[0].chunk_digest, "DIGEST100"); assert_eq!(chunks[0].chunk_compressed_size, 100); @@ -585,4 +1509,272 @@ mod tests { assert_eq!(chunks[0].chunk_uncompressed_offset, 400); Ok(()) } + + #[test] + fn test_algorithm_exponential_smoothing() -> Result<(), Box> { + let threshold = 0.1; + let mut all_chunk: Vec = Vec::new(); + for i in 0..199 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 0), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 2), + chunk_compressed_size: i, + chunk_uncompressed_size: i * 2, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunk.push(chunk); + } + let chunkdict = Algorithm::::exponential_smoothing(all_chunk, threshold)?; + assert_eq!(chunkdict.len(), 2); + assert_eq!(chunkdict[0].image_reference, "REDIS0"); + assert_eq!(chunkdict[0].version, "1.0.01"); + assert_eq!(chunkdict[0].chunk_blob_id, "BLOB99"); + assert_eq!(chunkdict[0].chunk_digest, "DIGEST0"); + assert_eq!(chunkdict[0].chunk_compressed_size, 99); + assert_eq!(chunkdict[0].chunk_uncompressed_size, 198); + assert_eq!(chunkdict[0].chunk_compressed_offset, 297); + assert_eq!(chunkdict[0].chunk_uncompressed_offset, 396); + Ok(()) + } + + #[test] + fn test_divide_by_image() -> Result<(), Box> { + let db_url = "./metadata.db"; + let chunk_table = ChunkTable::new(db_url)?; + chunk_table.create()?; + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i / 50), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 2), + chunk_compressed_size: i, + chunk_uncompressed_size: i * 2, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + chunk_table.insert(&chunk)?; + } + let algorithm = String::from("exponential_smoothing"); + let algorithm = Algorithm::::new(algorithm, db_url)?; + let all_chunks = algorithm.db.chunk_table.list_all()?; + assert_eq!(all_chunks.len(), 200); + let datadict = Algorithm::::divide_by_image(&all_chunks)?; + assert_eq!(datadict.len(), 4); + assert_eq!(datadict[3].cluster_id, 0); + assert_eq!(datadict[3].chunk_list.len(), 50); + chunk_table.clear()?; + Ok(()) + } + + #[test] + fn test_distance() -> Result<(), Box> { + let mut all_chunks1: Vec = Vec::new(); + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 0), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 4), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunks1.push(chunk); + } + let mut all_chunks2: Vec = Vec::new(); + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 1), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 4), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunks2.push(chunk); + } + let datadict = Algorithm::::distance(&all_chunks1, &all_chunks2)?; + assert!( + (datadict - 0.01).abs() <= 0.0001, + "Expected {} to be approximately equal to {} with tolerance {}", + datadict, + 0.01, + 0.0001 + ); + Ok(()) + } + + #[test] + fn test_divide_set() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", j / 10), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let (train, test) = Algorithm::::divide_set(&all_chunks, 0.7)?; + assert_eq!(train.len(), 14000); + assert_eq!(train[0].image_reference, "REDIS0"); + assert_eq!(train[0].version, "1.0.00"); + assert_eq!(test.len(), 6000); + assert_eq!(test[0].image_reference, "REDIS0"); + assert_eq!(test[0].version, "1.0.07"); + Ok(()) + } + + #[test] + fn test_dbscan() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + let radius = 0.6; + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", j / 10), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let mut data_point = Algorithm::::divide_by_image(&all_chunks)?; + let datadict = Algorithm::::dbsacn(&mut data_point, radius)?; + assert_eq!(datadict.len(), 200); + if datadict[150].chunk_list[0].chunk_digest == datadict[0].chunk_list[0].chunk_digest { + assert_eq!(datadict[150].cluster_id, 1); + } else { + assert_eq!(datadict[150].cluster_id, 2); + } + assert_eq!(datadict[0].cluster_id, 1); + assert!(datadict[150].clustered); + assert!(datadict[150].visited); + assert_eq!(datadict[0].chunk_list.len(), 100); + Ok(()) + } + + #[test] + fn test_aggregate_chunk() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + let radius = 0.6; + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", (j + 1) / 100), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let mut data_point = Algorithm::::divide_by_image(&all_chunks)?; + let data_cluster = Algorithm::::dbsacn(&mut data_point, radius)?; + let datadict = Algorithm::::aggregate_chunk(&data_cluster)?; + assert_eq!(datadict.len(), 2); + Ok(()) + } + + #[test] + fn test_deduplicate_image() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", j / 10), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let datadict = Algorithm::::deduplicate_image(all_chunks)?; + for i in datadict.clone() { + for (_, b) in i { + if !b.is_empty() { + assert_eq!(b.len(), 70); + } + } + } + assert_eq!(datadict[0].len(), 2); + assert_eq!(datadict[0].values().len(), 2); + assert_eq!(datadict[1].len(), 0); + assert_eq!(datadict[1].values().len(), 0); + assert_eq!(datadict.len(), 7); + Ok(()) + } + + #[test] + fn test_deduplicate_version() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + let mut chunkdict: Vec = Vec::new(); + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 0), + version: format!("1.0.0{}", (i + 1) / 20), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 2), + chunk_compressed_size: i, + chunk_uncompressed_size: i * 2, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunks.push(chunk); + } + let (chunkdict_version, chunkdict_image) = + Algorithm::::deduplicate_version(&all_chunks)?; + for (_, dictionary) in chunkdict_version { + chunkdict.extend(dictionary); + } + + assert_eq!(chunkdict[0].image_reference, "REDIS0"); + assert_eq!(chunkdict[0].chunk_compressed_size, 21); + assert_eq!(chunkdict.len(), 2); + + for single_clustering in chunkdict_image { + for (_, cluster_dictionary) in single_clustering { + chunkdict.extend(cluster_dictionary); + } + } + assert_eq!(chunkdict.len(), 2); + Ok(()) + } } diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index de4e1a7e276..5fa3a3a3c10 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -13,7 +13,10 @@ extern crate log; extern crate serde_json; #[macro_use] extern crate lazy_static; -use crate::deduplicate::SqliteDatabase; +use crate::deduplicate::{ + check_bootstrap_versions_consistency, update_ctx_from_parent_bootstrap, Deduplicate, + SqliteDatabase, +}; use std::convert::TryFrom; use std::fs::{self, metadata, DirEntry, File, OpenOptions}; use std::os::unix::fs::FileTypeExt; @@ -28,9 +31,9 @@ use nydus::{get_build_time_info, setup_logging}; use nydus_api::{BuildTimeInfo, ConfigV2, LocalFsConfig}; use nydus_builder::{ parse_chunk_dict_arg, ArtifactStorage, BlobCacheGenerator, BlobCompactor, BlobManager, - BootstrapManager, BuildContext, BuildOutput, Builder, ConversionType, DirectoryBuilder, - Feature, Features, HashChunkDict, Merger, Prefetch, PrefetchPolicy, StargzBuilder, - TarballBuilder, WhiteoutSpec, + BootstrapManager, BuildContext, BuildOutput, Builder, ChunkdictBlobInfo, ChunkdictChunkInfo, + ConversionType, DirectoryBuilder, Feature, Features, Generator, HashChunkDict, Merger, + Prefetch, PrefetchPolicy, StargzBuilder, TarballBuilder, WhiteoutSpec, }; use nydus_rafs::metadata::{MergeError, RafsSuper, RafsSuperConfig, RafsVersion}; use nydus_storage::backend::localfs::LocalFs; @@ -45,7 +48,6 @@ use nydus_utils::{ }; use serde::{Deserialize, Serialize}; -use crate::deduplicate::Deduplicate; use crate::unpack::{OCIUnpacker, Unpacker}; use crate::validator::Validator; @@ -386,42 +388,45 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { App::new("chunkdict") .about("deduplicate RAFS filesystem metadata") .subcommand( - App::new("save") - .about("Save chunk info to a database") + App::new("generate") + .about("generate chunk dictionary based on database") + .arg( + Arg::new("database") + .long("database") + .help("Database connection address for assisting chunk dictionary generation, e.g. /path/database.db") + .default_value("sqlite:///home/runner/output/database.db") + .required(false), + ) .arg( Arg::new("bootstrap") - .short('B') - .long("bootstrap") - .help("File path of RAFS meta blob/bootstrap") - .required(false), + .long("bootstrap") + .short('B') + .help("Output path of nydus overlaid bootstrap"), + ) + .arg( + Arg::new("blob-dir") + .long("blob-dir") + .short('D') + .help("Directory path to save generated RAFS metadata and data blobs"), + ) + .arg(arg_prefetch_policy.clone()) + .arg(arg_output_json.clone()) + .arg(arg_config.clone()) + .arg( + Arg::new("SOURCE") + .help("bootstrap paths (allow one or more)") + .required(true) + .num_args(1..), + ) + .arg( + Arg::new("verbose") + .long("verbose") + .short('v') + .help("Output message in verbose mode") + .action(ArgAction::SetTrue) + .required(false), ) - .arg( - Arg::new("database") - .long("database") - .help("Database connection URI for assisting chunk dict generation, e.g. sqlite:///path/to/database.db") - .default_value("sqlite://:memory:") - .required(false), - ) - .arg( - Arg::new("blob-dir") - .long("blob-dir") - .short('D') - .conflicts_with("config") - .help( - "Directory for localfs storage backend, hosting data blobs and cache files", - ), - ) - .arg(arg_config.clone()) - .arg( - Arg::new("verbose") - .long("verbose") - .short('v') - .help("Output message in verbose mode") - .action(ArgAction::SetTrue) - .required(false), ) - .arg(arg_output_json.clone()) - ) ); let app = app.subcommand( @@ -775,7 +780,10 @@ fn main() -> Result<()> { Command::create(matches, &build_info) } else if let Some(matches) = cmd.subcommand_matches("chunkdict") { match matches.subcommand_name() { - Some("save") => Command::chunkdict_save(matches.subcommand_matches("save").unwrap()), + Some("generate") => Command::chunkdict_generate( + matches.subcommand_matches("generate").unwrap(), + &build_info, + ), _ => { println!("{}", usage); Ok(()) @@ -1194,32 +1202,150 @@ impl Command { OutputSerializer::dump(matches, build_output, build_info, compressor, version) } - fn chunkdict_save(matches: &ArgMatches) -> Result<()> { - let bootstrap_path = Self::get_bootstrap(matches)?; - let config = Self::get_configuration(matches)?; + fn chunkdict_generate(matches: &ArgMatches, build_info: &BuildTimeInfo) -> Result<()> { + let mut build_ctx = BuildContext { + prefetch: Self::get_prefetch(matches)?, + ..Default::default() + }; let db_url: &String = matches.get_one::("database").unwrap(); - debug!("db_url: {}", db_url); - // For backward compatibility with v2.1. - config - .internal - .set_blob_accessible(matches.get_one::("bootstrap").is_none()); + // Save chunk and blob info to database. + let source_bootstrap_paths: Vec = matches + .get_many::("SOURCE") + .map(|paths| paths.map(PathBuf::from).collect()) + .unwrap(); + + check_bootstrap_versions_consistency(&mut build_ctx, &source_bootstrap_paths)?; + update_ctx_from_parent_bootstrap(&mut build_ctx, &source_bootstrap_paths[0])?; + + for (_, bootstrap_path) in source_bootstrap_paths.iter().enumerate() { + let path_name = bootstrap_path.as_path(); + + // Extract the image name and version name from the bootstrap directory. + let bootstrap_dir = match path_name + .parent() + .and_then(|p| p.file_name().and_then(|f| f.to_str())) + { + Some(dir_str) => dir_str.to_string(), + None => bail!("Invalid Bootstrap directory name"), + }; + let full_image_name: Vec<&str> = bootstrap_dir.split(':').collect(); + let image_name = match full_image_name.get(full_image_name.len() - 2) { + Some(&second_last) => second_last.to_string(), + None => bail!( + "Invalid image name {:?}", + full_image_name.get(full_image_name.len() - 2) + ), + }; + let image_tag = match full_image_name.last() { + Some(&last) => last.to_string(), + None => bail!("Invalid version name {:?}", full_image_name.last()), + }; + // For backward compatibility with v2.1. + let config = Self::get_configuration(matches)?; + config + .internal + .set_blob_accessible(matches.get_one::("bootstrap").is_none()); + let db_strs: Vec<&str> = db_url.split("://").collect(); + if db_strs.len() != 2 || (!db_strs[1].starts_with('/') && !db_strs[1].starts_with(':')) + { + bail!("Invalid database URL: {}", db_url); + } + match db_strs[0] { + "sqlite" => { + let mut deduplicate: Deduplicate = + Deduplicate::::new(db_strs[1])?; + deduplicate.save_metadata(bootstrap_path, config, image_name, image_tag)? + } + _ => { + bail!("Unsupported database type: {}, please use a valid database URI, such as 'sqlite:///path/to/chunkdict.db'.", db_strs[0]) + } + }; + } + info!("Chunkdict metadata is saved at: {:?}", db_url); + // Connecting database and generating chunk dictionary by algorithm "exponential_smoothing". let db_strs: Vec<&str> = db_url.split("://").collect(); if db_strs.len() != 2 || (!db_strs[1].starts_with('/') && !db_strs[1].starts_with(':')) { bail!("Invalid database URL: {}", db_url); } + let algorithm = String::from("exponential_smoothing"); + let _source_bootstrap_paths: Vec = matches + .get_many::("SOURCE") + .map(|paths| paths.map(PathBuf::from).collect()) + .unwrap(); + + let (chunkdict_chunks, chunkdict_blobs, noise_points): ( + Vec, + Vec, + Vec, + ); match db_strs[0] { "sqlite" => { - let mut deduplicate: Deduplicate = - Deduplicate::::new(db_strs[1])?; - deduplicate.save_metadata(bootstrap_path, config)? + let mut algorithm: deduplicate::Algorithm = + deduplicate::Algorithm::::new(algorithm, db_strs[1])?; + let result = algorithm.chunkdict_generate()?; + chunkdict_chunks = result.0; + chunkdict_blobs = result.1; + noise_points = result.2; } _ => { - bail!("Unsupported database type: {}, please use a valid database URI, such as 'sqlite:///path/to/database.db'.", db_strs[0]) + bail!("Unsupported database type: {}, please use a valid database URI, such as 'sqlite:///path/to/chunkdict.db'.", db_strs[0]) } }; - info!("Chunkdict metadata is saved at: {:?}", db_url); + + // Output noise point in DBSCAN clustering algorithm. + info!( + "The length of chunkdict is {}", + Vec::::len(&chunkdict_chunks) + ); + info!("It is not recommended to use image deduplication"); + for image_name in noise_points { + info!("{}", image_name); + } + + // Dump chunkdict to bootstrap. + let chunkdict_bootstrap_path = Self::get_bootstrap_storage(matches)?; + let config = + Self::get_configuration(matches).context("failed to get configuration information")?; + config + .internal + .set_blob_accessible(matches.get_one::("config").is_some()); + build_ctx.configuration = config; + build_ctx.blob_storage = Some(chunkdict_bootstrap_path); + build_ctx + .blob_features + .insert(BlobFeatures::IS_CHUNKDICT_GENERATED); + build_ctx.is_chunkdict_generated = true; + + let mut blob_mgr = BlobManager::new(build_ctx.digester); + + let bootstrap_path = Self::get_bootstrap_storage(matches)?; + let mut bootstrap_mgr = BootstrapManager::new(Some(bootstrap_path), None); + + let output = Generator::generate( + &mut build_ctx, + &mut bootstrap_mgr, + &mut blob_mgr, + chunkdict_chunks, + chunkdict_blobs, + )?; + OutputSerializer::dump( + matches, + output, + build_info, + build_ctx.compressor, + build_ctx.fs_version, + ) + .unwrap(); + info!( + "Chunkdict metadata is saved at: {:?}", + matches + .get_one::("bootstrap") + .map(|s| s.as_str()) + .unwrap_or_default(), + ); + Ok(()) } diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index fd561c8a60c..e6b8c5b80da 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -266,7 +266,9 @@ impl FileCacheEntry { ); return Err(einval!(msg)); } - let meta = if blob_info.meta_ci_is_valid() { + let meta = if blob_info.meta_ci_is_valid() + || blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) + { let meta = FileCacheMeta::new( blob_file_path, blob_info.clone(), diff --git a/storage/src/device.rs b/storage/src/device.rs index cdefa5f77b8..6e6cbc15ed6 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -77,6 +77,8 @@ bitflags! { const CAP_TAR_TOC = 0x4000_0000; /// Rafs V5 image without extended blob table, this is an internal flag. const _V5_NO_EXT_BLOB_TABLE = 0x8000_0000; + /// Blob is generated with chunkdict. + const IS_CHUNKDICT_GENERATED = 0x0000_0200; } } @@ -172,6 +174,9 @@ pub struct BlobInfo { cipher_object: Arc, /// Cipher context for encryption. cipher_ctx: Option, + + /// is chunkdict generated + is_chunkdict_generated: bool, } impl BlobInfo { @@ -215,6 +220,8 @@ impl BlobInfo { meta_path: Arc::new(Mutex::new(String::new())), cipher_object: Default::default(), cipher_ctx: None, + + is_chunkdict_generated: false, }; blob_info.compute_features(); @@ -222,6 +229,16 @@ impl BlobInfo { blob_info } + /// Set the is_chunkdict_generated flag. + pub fn set_chunkdict_generated(&mut self, is_chunkdict_generated: bool) { + self.is_chunkdict_generated = is_chunkdict_generated; + } + + /// Get the is_chunkdict_generated flag. + pub fn is_chunkdict_generated(&self) -> bool { + self.is_chunkdict_generated + } + /// Get the blob index in the blob array. pub fn blob_index(&self) -> u32 { self.blob_index diff --git a/storage/src/meta/mod.rs b/storage/src/meta/mod.rs index ef892b56b97..9e9d40334c3 100644 --- a/storage/src/meta/mod.rs +++ b/storage/src/meta/mod.rs @@ -354,6 +354,15 @@ impl BlobCompressionContextHeader { ) } } + + /// Set flag indicating whether it's a blob for batch chunk or not. + pub fn set_is_chunkdict_generated(&mut self, enable: bool) { + if enable { + self.s_features |= BlobFeatures::IS_CHUNKDICT_GENERATED.bits(); + } else { + self.s_features &= !BlobFeatures::IS_CHUNKDICT_GENERATED.bits(); + } + } } /// Struct to manage blob chunk compression information, a wrapper over [BlobCompressionContext]. @@ -850,7 +859,8 @@ impl BlobCompressionContextInfo { if u32::from_le(header.s_magic) != BLOB_CCT_MAGIC || u32::from_le(header.s_magic2) != BLOB_CCT_MAGIC - || u32::from_le(header.s_ci_entries) != blob_info.chunk_count() + || (!blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) + && u32::from_le(header.s_ci_entries) != blob_info.chunk_count()) || u32::from_le(header.s_ci_compressor) != blob_info.meta_ci_compressor() as u32 || u64::from_le(header.s_ci_offset) != blob_info.meta_ci_offset() || u64::from_le(header.s_ci_compressed_size) != blob_info.meta_ci_compressed_size() @@ -886,8 +896,9 @@ impl BlobCompressionContextInfo { || blob_info.has_feature(BlobFeatures::BATCH) { return Err(einval!("invalid feature flags in blob meta header!")); - } else if info_size != (chunk_count as usize) * (size_of::()) - || (aligned_info_size as u64) > BLOB_CCT_V1_MAX_SIZE + } else if !blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) + && (info_size != (chunk_count as usize) * (size_of::()) + || (aligned_info_size as u64) > BLOB_CCT_V1_MAX_SIZE) { return Err(einval!("uncompressed size in blob meta header is invalid!")); } @@ -1776,7 +1787,10 @@ impl BlobMetaChunkArray { ) -> Result<&'a T> { assert!(index < chunk_info_array.len()); let entry = &chunk_info_array[index]; - entry.validate(state)?; + // If the chunk belongs to a chunkdict, skip the validation check. + if state.blob_features & BlobFeatures::IS_CHUNKDICT_GENERATED.bits() == 0 { + entry.validate(state)?; + } Ok(entry) } } @@ -1993,6 +2007,9 @@ pub fn format_blob_features(features: BlobFeatures) -> String { if features.contains(BlobFeatures::ENCRYPTED) { output += "encrypted "; } + if features.contains(BlobFeatures::IS_CHUNKDICT_GENERATED) { + output += "is-chunkdict-generated "; + } output.trim_end().to_string() } diff --git a/utils/src/digest.rs b/utils/src/digest.rs index 26a6997a530..12e74486f3b 100644 --- a/utils/src/digest.rs +++ b/utils/src/digest.rs @@ -176,6 +176,18 @@ impl RafsDigest { } } + /// According to the format of sha256. + pub fn from_string(input: &str) -> Self { + let mut digest = RafsDigest::default(); + + for (i, byte) in input.as_bytes().chunks(2).enumerate() { + let hex_str = std::str::from_utf8(byte).unwrap(); + digest.data[i] = u8::from_str_radix(hex_str, 16).unwrap(); + } + + digest + } + pub fn hasher(algorithm: Algorithm) -> RafsDigestHasher { match algorithm { Algorithm::Blake3 => RafsDigestHasher::Blake3(Box::new(blake3::Hasher::new())),