From fc86651d6d0501f11159063f0a53b7bb79e3c8bf Mon Sep 17 00:00:00 2001 From: Nan Li Date: Tue, 26 Sep 2023 09:26:02 +0000 Subject: [PATCH 1/4] feat: implement `takeover` for nydusd fusedev daemon This patch implements the `save` and `restore` functions in the `fusedev_upgrade` in the service create. To do this, - This patch add a new create named `nydus-upgrade` into the workspace. The `nydus-upgrade` create has some util functions help to do serialization and deserialization for the rust structs using the versionize and snapshot crates. The crate also has a trait named `StorageBackend` which can be used to store and restore fuse session fds and state data for the upgrade action, and there's also an implementation named `UdsStorageBackend` which uses unix domain socket to do this. - as we have to use the same fuse session connection, backend file system mount commands, Vfs to re-mount the rafs for the new daemon (created for "hot upgrade" or failover), this patch add a new struct named `FusedevState` to hold these information. The `FusedevState` is serialized and stored into the `UdsStorageBackend` (which happens in the `save` function in the `fusedev_upgrade` module) before the new daemon is created, and the `FusedevState` is deserialized and restored from the `UdsStorageBackend` (which happens in the `restore` function in the `fusedev_upgrade` module) when the new daemon is triggered by `takeover`. Signed-off-by: Nan Li Signed-off-by: linan.loheagn3 --- Cargo.lock | 138 ++++++++++-- Cargo.toml | 39 +++- api/src/http.rs | 2 +- clib/Cargo.toml | 2 +- rafs/Cargo.toml | 12 +- service/Cargo.toml | 20 +- service/src/fs_service.rs | 18 +- service/src/fusedev.rs | 5 +- service/src/lib.rs | 8 +- service/src/upgrade.rs | 262 ++++++++++++++++++++-- storage/Cargo.toml | 25 ++- upgrade/Cargo.toml | 16 ++ upgrade/src/backend/mod.rs | 93 ++++++++ upgrade/src/backend/unix_domain_socket.rs | 52 +++++ upgrade/src/lib.rs | 6 + upgrade/src/persist.rs | 65 ++++++ 16 files changed, 689 insertions(+), 74 deletions(-) create mode 100644 upgrade/Cargo.toml create mode 100644 upgrade/src/backend/mod.rs create mode 100644 upgrade/src/backend/unix_domain_socket.rs create mode 100644 upgrade/src/lib.rs create mode 100644 upgrade/src/persist.rs diff --git a/Cargo.lock b/Cargo.lock index 522f5b06e7e..186b6583d7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,15 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -229,7 +238,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -315,6 +324,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crc64" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55626594feae15d266d52440b26ff77de0e22230cf0c113abe619084c1ddc910" + [[package]] name = "crypto-common" version = "0.1.6" @@ -349,7 +364,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn", + "syn 1.0.98", ] [[package]] @@ -366,7 +381,7 @@ checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -524,8 +539,7 @@ dependencies = [ [[package]] name = "fuse-backend-rs" version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f85357722be4bf3d0b7548bedf7499686c77628c2c61cb99c6519463f7a9e5f0" +source = "git+https://github.com/loheagn/fuse-backend-rs.git?branch=vfs-persist#be366463fa66e2e675e32e79b3a5500e58f153d1" dependencies = [ "arc-swap", "bitflags 1.3.2", @@ -536,6 +550,9 @@ dependencies = [ "log", "mio", "nix", + "snapshot", + "versionize", + "versionize_derive", "vhost", "virtio-queue", "vm-memory", @@ -571,7 +588,7 @@ checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1286,6 +1303,7 @@ dependencies = [ "nydus-api", "nydus-rafs", "nydus-storage", + "nydus-upgrade", "nydus-utils", "rust-fsm", "serde", @@ -1294,6 +1312,8 @@ dependencies = [ "time", "tokio", "tokio-uring", + "versionize", + "versionize_derive", "vhost", "vhost-user-backend", "virtio-bindings", @@ -1342,6 +1362,17 @@ dependencies = [ "vmm-sys-util", ] +[[package]] +name = "nydus-upgrade" +version = "0.1.0" +dependencies = [ + "sendfd", + "snapshot", + "thiserror", + "versionize", + "versionize_derive", +] + [[package]] name = "nydus-utils" version = "0.4.3" @@ -1406,7 +1437,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1495,7 +1526,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1531,7 +1562,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.98", "version_check", ] @@ -1548,18 +1579,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.47" +version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.20" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bcdf212e9776fbcb2d23ab029360416bb1706b1aea2d1a5ba002727cbcab804" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -1718,7 +1749,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44237c429621e3606374941c3061fe95686bdaddb9b4f6524e4edc2d21da9c58" dependencies = [ "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1806,6 +1837,15 @@ dependencies = [ "libc", ] +[[package]] +name = "sendfd" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604b71b8fc267e13bb3023a2c901126c8f349393666a6d98ac1ae5729b701798" +dependencies = [ + "libc", +] + [[package]] name = "serde" version = "1.0.139" @@ -1823,7 +1863,7 @@ checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1886,6 +1926,17 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" +[[package]] +name = "snapshot" +version = "0.1.0" +source = "git+https://github.com/firecracker-microvm/firecracker?tag=v1.4.1#02dd0328589c59ae11b7ad13eaf412410f8e72e1" +dependencies = [ + "libc", + "thiserror", + "versionize", + "versionize_derive", +] + [[package]] name = "socket2" version = "0.4.4" @@ -1919,6 +1970,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "tar" version = "0.4.40" @@ -1954,22 +2016,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.31" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.31" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.37", ] [[package]] @@ -2025,7 +2087,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -2101,7 +2163,7 @@ checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -2186,6 +2248,34 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "versionize" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dca4b7062e7e6d685901e815c35f9671e059de97c1c0905eeff8592f3fff442f" +dependencies = [ + "bincode", + "crc64", + "proc-macro2", + "quote", + "serde", + "serde_derive", + "syn 1.0.98", + "versionize_derive", + "vmm-sys-util", +] + +[[package]] +name = "versionize_derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c4971c07ef708cd51003222e509aaed8eae14aeb2907706b01578843195e03a" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.98", +] + [[package]] name = "vhost" version = "0.6.0" @@ -2289,7 +2379,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn", + "syn 1.0.98", "wasm-bindgen-shared", ] @@ -2323,7 +2413,7 @@ checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index c85a07a741b..ce257ac9474 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ path = "src/lib.rs" anyhow = "1" clap = { version = "4.0.18", features = ["derive", "cargo"] } flexi_logger = { version = "0.25", features = ["compress"] } -fuse-backend-rs = "^0.10.4" +fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } hex = "0.4.3" hyper = "0.14.11" hyperlocal = "0.8.0" @@ -57,16 +57,25 @@ openssl = { version = "0.10.55", features = ["vendored"] } # pin openssl-src to bring in fix for https://rustsec.org/advisories/RUSTSEC-2022-0032 #openssl-src = { version = "111.22" } -nydus-api = { version = "0.3.0", path = "api", features = ["error-backtrace", "handler"] } +nydus-api = { version = "0.3.0", path = "api", features = [ + "error-backtrace", + "handler", +] } nydus-builder = { version = "0.1.0", path = "builder" } nydus-rafs = { version = "0.3.1", path = "rafs" } -nydus-service = { version = "0.3.0", path = "service", features = ["block-device"] } -nydus-storage = { version = "0.6.3", path = "storage", features = ["prefetch-rate-limit"] } +nydus-service = { version = "0.3.0", path = "service", features = [ + "block-device", +] } +nydus-storage = { version = "0.6.3", path = "storage", features = [ + "prefetch-rate-limit", +] } nydus-utils = { version = "0.4.2", path = "utils" } vhost = { version = "0.6.0", features = ["vhost-user-slave"], optional = true } vhost-user-backend = { version = "0.8.0", optional = true } -virtio-bindings = { version = "0.1", features = ["virtio-v5_0_0"], optional = true } +virtio-bindings = { version = "0.1", features = [ + "virtio-v5_0_0", +], optional = true } virtio-queue = { version = "0.7.0", optional = true } vm-memory = { version = "0.10.0", features = ["backend-mmap"], optional = true } vmm-sys-util = { version = "0.11.0", optional = true } @@ -96,15 +105,25 @@ virtiofs = [ "vm-memory", "vmm-sys-util", ] -block-nbd = [ - "nydus-service/block-nbd" -] +block-nbd = ["nydus-service/block-nbd"] backend-http-proxy = ["nydus-storage/backend-http-proxy"] -backend-localdisk = ["nydus-storage/backend-localdisk", "nydus-storage/backend-localdisk-gpt"] +backend-localdisk = [ + "nydus-storage/backend-localdisk", + "nydus-storage/backend-localdisk-gpt", +] backend-oss = ["nydus-storage/backend-oss"] backend-registry = ["nydus-storage/backend-registry"] backend-s3 = ["nydus-storage/backend-s3"] [workspace] -members = ["api", "builder", "clib", "rafs", "storage", "service", "utils"] \ No newline at end of file +members = [ + "api", + "builder", + "clib", + "rafs", + "storage", + "service", + "upgrade", + "utils", +] diff --git a/api/src/http.rs b/api/src/http.rs index c37645bba63..71d66f0bd81 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -132,7 +132,7 @@ pub enum DaemonErrorKind { /// Unexpected event type. UnexpectedEvent(String), /// Can't upgrade the daemon. - UpgradeManager, + UpgradeManager(String), /// Unsupported requests. Unsupported, } diff --git a/clib/Cargo.toml b/clib/Cargo.toml index 34583b211d5..fe5892f8dd2 100644 --- a/clib/Cargo.toml +++ b/clib/Cargo.toml @@ -15,7 +15,7 @@ crate-type = ["cdylib", "staticlib"] [dependencies] libc = "0.2.137" log = "0.4.17" -fuse-backend-rs = "^0.10.3" +fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } nydus-api = { version = "0.3", path = "../api" } nydus-rafs = { version = "0.3.1", path = "../rafs" } nydus-storage = { version = "0.6.3", path = "../storage" } diff --git a/rafs/Cargo.toml b/rafs/Cargo.toml index 7424a811172..acfcf5fd2b7 100644 --- a/rafs/Cargo.toml +++ b/rafs/Cargo.toml @@ -19,11 +19,13 @@ nix = "0.24" serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.53" vm-memory = "0.10" -fuse-backend-rs = "^0.10.3" +fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } thiserror = "1" nydus-api = { version = "0.3", path = "../api" } -nydus-storage = { version = "0.6", path = "../storage", features = ["backend-localfs"] } +nydus-storage = { version = "0.6", path = "../storage", features = [ + "backend-localfs", +] } nydus-utils = { version = "0.4", path = "../utils" } [dev-dependencies] @@ -37,4 +39,8 @@ vhost-user-fs = ["fuse-backend-rs/vhost-user-fs"] [package.metadata.docs.rs] all-features = true -targets = ["x86_64-unknown-linux-gnu", "aarch64-unknown-linux-gnu", "aarch64-apple-darwin"] +targets = [ + "x86_64-unknown-linux-gnu", + "aarch64-unknown-linux-gnu", + "aarch64-apple-darwin", +] diff --git a/service/Cargo.toml b/service/Cargo.toml index 4d0497c0d30..5714960b029 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -12,7 +12,9 @@ resolver = "2" [dependencies] bytes = { version = "1", optional = true } dbs-allocator = { version = "0.1.1", optional = true } -fuse-backend-rs = "^0.10.3" +fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist", features = [ + "persist", +] } libc = "0.2" log = "0.4.8" mio = { version = "0.8", features = ["os-poll", "os-ext"] } @@ -23,15 +25,20 @@ serde_json = "1.0.51" thiserror = "1.0" time = { version = "0.3.14", features = ["serde-human-readable"] } tokio = { version = "1.24", features = ["macros"] } +versionize_derive = "0.1.6" +versionize = "0.1.10" -nydus-api = { version = "0.3.0", path = "../api"} +nydus-api = { version = "0.3.0", path = "../api" } nydus-rafs = { version = "0.3.1", path = "../rafs" } nydus-storage = { version = "0.6.3", path = "../storage" } +nydus-upgrade = { version = "0.1.0", path = "../upgrade" } nydus-utils = { version = "0.4.2", path = "../utils" } vhost = { version = "0.6.0", features = ["vhost-user-slave"], optional = true } vhost-user-backend = { version = "0.8.0", optional = true } -virtio-bindings = { version = "0.1", features = ["virtio-v5_0_0"], optional = true } +virtio-bindings = { version = "0.1", features = [ + "virtio-v5_0_0", +], optional = true } virtio-queue = { version = "0.7.0", optional = true } vm-memory = { version = "0.10.0", features = ["backend-mmap"], optional = true } @@ -52,10 +59,7 @@ virtiofs = [ "virtio-bindings", ] -block-device = [ "dbs-allocator", "tokio/fs"] +block-device = ["dbs-allocator", "tokio/fs"] block-nbd = ["block-device", "bytes"] -coco = [ - "fuse-backend-rs/fusedev", - "nydus-storage/backend-registry", -] +coco = ["fuse-backend-rs/fusedev", "nydus-storage/backend-registry"] diff --git a/service/src/fs_service.rs b/service/src/fs_service.rs index d4b9c2fa34d..94c45699cd9 100644 --- a/service/src/fs_service.rs +++ b/service/src/fs_service.rs @@ -13,6 +13,7 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::{Arc, MutexGuard}; +use fuse_backend_rs::api::vfs::VfsError; use fuse_backend_rs::api::{BackFileSystem, Vfs}; #[cfg(target_os = "linux")] use fuse_backend_rs::passthrough::{Config, PassthroughFs}; @@ -21,12 +22,14 @@ use nydus_rafs::fs::Rafs; use nydus_rafs::{RafsError, RafsIoRead}; use nydus_storage::factory::BLOB_FACTORY; use serde::{Deserialize, Serialize}; +use versionize::{VersionMap, Versionize, VersionizeResult}; +use versionize_derive::Versionize; use crate::upgrade::UpgradeManager; use crate::{Error, FsBackendDescriptor, FsBackendType, Result}; /// Request structure to mount a filesystem instance. -#[derive(Clone)] +#[derive(Clone, Versionize)] pub struct FsBackendMountCmd { /// Filesystem type. pub fs_type: FsBackendType, @@ -122,6 +125,7 @@ pub trait FsService: Send + Sync { "failed to add filesystem instance to upgrade manager, {}", e ); + mgr_guard.disable_upgrade(); warn!("disable online upgrade due to inconsistent status!!!"); } } @@ -162,6 +166,7 @@ pub trait FsService: Send + Sync { "failed to update filesystem instance to upgrade manager, {}", e ); + mgr_guard.disable_upgrade(); warn!("disable online upgrade due to inconsistent status!!!"); } } @@ -169,6 +174,17 @@ pub trait FsService: Send + Sync { Ok(()) } + /// Restore a filesystem instance. + fn restore_mount(&self, cmd: &FsBackendMountCmd, vfs_index: u8) -> Result<()> { + let backend = fs_backend_factory(cmd)?; + self.get_vfs() + .restore_mount(backend, vfs_index, &cmd.mountpoint) + .map_err(VfsError::RestoreMount)?; + self.backend_collection().add(&cmd.mountpoint, &cmd)?; + info!("backend fs restored at {}", cmd.mountpoint); + Ok(()) + } + /// Umount a filesystem instance. fn umount(&self, cmd: FsBackendUmountCmd) -> Result<()> { let _ = self diff --git a/service/src/fusedev.rs b/service/src/fusedev.rs index e70151647a8..d36341cb0ef 100644 --- a/service/src/fusedev.rs +++ b/service/src/fusedev.rs @@ -145,10 +145,9 @@ impl FuseServer { } } -struct FusedevFsService { +pub struct FusedevFsService { /// Fuse connection ID which usually equals to `st_dev` pub conn: AtomicU64, - #[allow(dead_code)] pub failover_policy: FailoverPolicy, pub session: Mutex, @@ -254,7 +253,7 @@ pub struct FusedevDaemon { result_receiver: Mutex>>, service: Arc, state: AtomicI32, - supervisor: Option, + pub supervisor: Option, threads_cnt: u32, state_machine_thread: Mutex>>>, fuse_service_threads: Mutex>>>, diff --git a/service/src/lib.rs b/service/src/lib.rs index 976ca6d2bf9..8e47b27b2fd 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -29,6 +29,8 @@ use nydus_api::{ConfigV2, DaemonErrorKind}; use nydus_rafs::RafsError; use serde::{Deserialize, Serialize}; use serde_json::Error as SerdeError; +use versionize::{VersionMap, Versionize, VersionizeError, VersionizeResult}; +use versionize_derive::Versionize; pub mod daemon; mod fs_service; @@ -79,7 +81,7 @@ pub enum Error { ChannelSend(#[from] SendError), #[error("failed to receive message from channel, {0}")] ChannelReceive(#[from] RecvError), - #[error(transparent)] + #[error("failed to upgrade nydusd daemon, {0}")] UpgradeManager(upgrade::UpgradeMgrError), #[error("failed to start service, {0}")] StartService(String), @@ -134,7 +136,7 @@ impl From for DaemonErrorKind { fn from(e: Error) -> Self { use Error::*; match e { - UpgradeManager(_) => DaemonErrorKind::UpgradeManager, + UpgradeManager(e) => DaemonErrorKind::UpgradeManager(format!("{:?}", e)), NotReady => DaemonErrorKind::NotReady, Unsupported => DaemonErrorKind::Unsupported, Serde(e) => DaemonErrorKind::Serde(e), @@ -148,7 +150,7 @@ impl From for DaemonErrorKind { pub type Result = std::result::Result; /// Type of supported backend filesystems. -#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)] +#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, Versionize)] pub enum FsBackendType { /// Registry Accelerated File System Rafs, diff --git a/service/src/upgrade.rs b/service/src/upgrade.rs index 7c55433af91..afae3506450 100644 --- a/service/src/upgrade.rs +++ b/service/src/upgrade.rs @@ -4,15 +4,40 @@ //! Online upgrade manager for Nydus daemons and filesystems. +use std::any::TypeId; +use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; +use std::io; +use std::os::fd::RawFd; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; + +use nydus_upgrade::backend::unix_domain_socket::UdsStorageBackend; +use nydus_upgrade::backend::{StorageBackend, StorageBackendErr}; use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd}; -use crate::Result; +use crate::{Error, Result}; /// Error codes related to upgrade manager. #[derive(thiserror::Error, Debug)] -pub enum UpgradeMgrError {} +pub enum UpgradeMgrError { + #[error("missing supervisor path")] + MissingSupervisorPath, + + #[error("failed to save/restore data via the backend, {0}")] + StorageBackendError(StorageBackendErr), + + #[error("failed to serialize, {0}")] + Serialize(io::Error), + #[error("failed to deserialize, {0}")] + Deserialize(io::Error), +} + +impl From for Error { + fn from(e: UpgradeMgrError) -> Self { + Error::UpgradeManager(e) + } +} /// FUSE fail-over policies. #[derive(PartialEq, Eq, Debug)] @@ -43,46 +68,255 @@ impl TryFrom<&String> for FailoverPolicy { } } +const MAX_STATE_DATA_LENGTH: usize = 1024 * 32; + /// Online upgrade manager. -pub struct UpgradeManager {} +pub struct UpgradeManager { + // backend_mount_cmd_map records the mount command of each backend filesystem. + // the structure is: mountpoint -> (FsBackendMountCmd, vfs_index) + backend_mount_cmd_map: HashMap, + fds: Vec, + backend: Box, + + disabled: AtomicBool, +} impl UpgradeManager { /// Create a new instance of [UpgradeManager]. - pub fn new(_: PathBuf) -> Self { - UpgradeManager {} + pub fn new(socket_path: PathBuf) -> Self { + UpgradeManager { + backend_mount_cmd_map: HashMap::new(), + backend: Box::new(UdsStorageBackend::new(socket_path)), + fds: Vec::new(), + disabled: AtomicBool::new(false), + } } /// Add a filesystem instance into the upgrade manager. - pub fn add_mounts_state(&mut self, _cmd: FsBackendMountCmd, _vfs_index: u8) -> Result<()> { + pub fn add_mounts_state(&mut self, cmd: FsBackendMountCmd, vfs_index: u8) -> Result<()> { + if self.disabled.load(Ordering::Acquire) { + return Err(Error::Unsupported); + } + + let cmd_map = &mut self.backend_mount_cmd_map; + if cmd_map.contains_key(&cmd.mountpoint) { + return Err(Error::AlreadyExists); + } + + cmd_map.insert(cmd.mountpoint.clone(), (cmd, vfs_index)); Ok(()) } /// Update a filesystem instance in the upgrade manager. - pub fn update_mounts_state(&mut self, _cmd: FsBackendMountCmd) -> Result<()> { - Ok(()) + pub fn update_mounts_state(&mut self, cmd: FsBackendMountCmd) -> Result<()> { + if self.disabled.load(Ordering::Acquire) { + return Err(Error::Unsupported); + } + + let cmd_map = &mut self.backend_mount_cmd_map; + match cmd_map.get_mut(&cmd.mountpoint) { + Some(cmd_with_vfs_idx) => { + cmd_with_vfs_idx.0 = cmd; + Ok(()) + } + None => Err(Error::NotFound), + } } /// Remove a filesystem instance from the upgrade manager. - pub fn remove_mounts_state(&mut self, _cmd: FsBackendUmountCmd) -> Result<()> { - Ok(()) + pub fn remove_mounts_state(&mut self, cmd: FsBackendUmountCmd) -> Result<()> { + if self.disabled.load(Ordering::Acquire) { + return Err(Error::Unsupported); + } + + let cmd_map = &mut self.backend_mount_cmd_map; + match cmd_map.get_mut(&cmd.mountpoint) { + Some(_) => { + cmd_map.remove(&cmd.mountpoint); + Ok(()) + } + None => Err(Error::NotFound), + } } /// Disable online upgrade capability. - pub fn disable_upgrade(&mut self) {} + pub fn disable_upgrade(&mut self) { + self.disabled.store(true, Ordering::Release); + } + + /// Save the fuse fds and fuse state data for online upgrade. + fn save(&mut self, data: &Vec) -> Result<()> { + self.backend + .save(&self.fds, data) + .map_err(UpgradeMgrError::StorageBackendError)?; + Ok(()) + } + + /// Restore the fuse fds and fuse state data for online upgrade. + fn restore(&mut self) -> Result> { + let mut fds = vec![0 as RawFd; 8]; + let mut state_data = vec![0u8; MAX_STATE_DATA_LENGTH]; + let (state_data_len, fds_len) = self + .backend + .restore(&mut fds, &mut state_data) + .map_err(UpgradeMgrError::StorageBackendError)?; + fds.truncate(fds_len); + state_data.truncate(state_data_len); + + self.fds = fds; + Ok(state_data) + } + + fn add_fd(&mut self, fd: RawFd) { + self.fds.push(fd); + } } /// Online upgrade utilities for FUSE daemon. pub mod fusedev_upgrade { + use std::fs::File; + use std::os::fd::{FromRawFd, RawFd}; + use std::os::unix::io::AsRawFd; + use std::sync::atomic::Ordering; + + use nydus_upgrade::persist::Snapshotter; + use versionize::{VersionMap, Versionize, VersionizeResult}; + use versionize_derive::Versionize; + use super::*; - use crate::fusedev::FusedevDaemon; + use crate::daemon::NydusDaemon; + use crate::fusedev::{FusedevDaemon, FusedevFsService}; + + #[derive(Versionize, Clone, Default)] + struct FusedevState { + backend_fs_mount_cmd_list: Vec<(FsBackendMountCmd, u8)>, + vfs_state_data: Vec, + fuse_conn_id: u64, + } + + impl Snapshotter for FusedevState { + fn get_versions() -> Vec> { + vec![ + // version 1 + HashMap::from([(FusedevState::type_id(), 1)]), + // more versions for the future + ] + } + } /// Save state information for a FUSE daemon. - pub fn save(_daemon: &FusedevDaemon) -> Result<()> { + pub fn save(daemon: &FusedevDaemon) -> Result<()> { + let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?; + + if !svc.get_vfs().initialized() { + return Err(Error::NotReady); + } + + let mut mgr = svc.upgrade_mgr().unwrap(); + + // set fd + let fd = svc + .as_ref() + .as_any() + .downcast_ref::() + .unwrap() + .session + .lock() + .unwrap() + .get_fuse_file() + .unwrap() + .as_raw_fd(); + mgr.add_fd(fd); + + // save vfs state + let vfs_state_data = svc.get_vfs().save_to_bytes().map_err(|e| { + let io_err = io::Error::new( + io::ErrorKind::Other, + format!("Failed to save vfs state: {:?}", e), + ); + UpgradeMgrError::Serialize(io_err) + })?; + + let backend_fs_mount_cmd_list = mgr + .backend_mount_cmd_map + .iter() + .map(|(_, (cmd, vfs_idx))| (cmd.clone(), *vfs_idx)) + .collect(); + + let fuse_conn_id = svc + .as_any() + .downcast_ref::() + .unwrap() + .conn + .load(Ordering::Acquire); + + let state = FusedevState { + backend_fs_mount_cmd_list, + vfs_state_data, + fuse_conn_id, + }; + let state = state.save().map_err(UpgradeMgrError::Serialize)?; + + // save the mgr state via the backend in the mgr + mgr.save(&state)?; + Ok(()) } /// Restore state information for a FUSE daemon. - pub fn restore(_daemon: &FusedevDaemon) -> Result<()> { + pub fn restore(daemon: &FusedevDaemon) -> Result<()> { + if daemon.supervisor.is_none() { + return Err(UpgradeMgrError::MissingSupervisorPath.into()); + } + + let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?; + + let mut mgr = svc.upgrade_mgr().unwrap(); + + // restore the mgr state via the backend in the mgr + let mut state_data = mgr.restore()?; + + let mut state = + FusedevState::restore(&mut state_data).map_err(UpgradeMgrError::Deserialize)?; + + // restore the backend fs mount cmd map + state + .backend_fs_mount_cmd_list + .iter() + .for_each(|(cmd, vfs_idx)| { + mgr.backend_mount_cmd_map + .insert(cmd.mountpoint.clone(), (cmd.clone(), *vfs_idx)); + }); + + // restore the fuse daemon + svc.as_any() + .downcast_ref::() + .unwrap() + .conn + .store(state.fuse_conn_id, Ordering::Release); + + // restore fuse fd + svc.as_any() + .downcast_ref::() + .unwrap() + .session + .lock() + .unwrap() + .set_fuse_file(unsafe { File::from_raw_fd(mgr.fds[0] as RawFd) }); + + // restore vfs + svc.get_vfs() + .restore_from_bytes(&mut state.vfs_state_data)?; + state + .backend_fs_mount_cmd_list + .iter() + .try_for_each(|(cmd, vfs_idx)| -> Result<()> { + svc.restore_mount(cmd, *vfs_idx)?; + // as we are in upgrade stage and obtain the lock, `unwrap` is safe here + mgr.add_mounts_state(cmd.clone(), *vfs_idx).unwrap(); + Ok(()) + })?; + Ok(()) } } diff --git a/storage/Cargo.toml b/storage/Cargo.toml index b35840dad26..78373874156 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -16,8 +16,8 @@ hex = "0.4.3" hmac = { version = "0.12.1", optional = true } http = { version = "0.2.8", optional = true } httpdate = { version = "1.0", optional = true } -hyper = {version = "0.14.11", optional = true } -hyperlocal = {version = "0.8.0", optional = true } +hyper = { version = "0.14.11", optional = true } +hyperlocal = { version = "0.8.0", optional = true } lazy_static = "1.4.0" leaky-bucket = { version = "0.12.1", optional = true } libc = "0.2" @@ -33,14 +33,23 @@ sha1 = { version = "0.10.5", optional = true } sha2 = { version = "0.10.2", optional = true } tar = "0.4.40" time = { version = "0.3.14", features = ["formatting"], optional = true } -tokio = { version = "1.19.0", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } +tokio = { version = "1.19.0", features = [ + "macros", + "rt", + "rt-multi-thread", + "sync", + "time", +] } url = { version = "2.1.1", optional = true } vm-memory = "0.10" -fuse-backend-rs = "^0.10.3" +fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } gpt = { version = "3.1.0", optional = true } nydus-api = { version = "0.3", path = "../api" } -nydus-utils = { version = "0.4", path = "../utils", features = ["encryption", "zran"] } +nydus-utils = { version = "0.4", path = "../utils", features = [ + "encryption", + "zran", +] } [dev-dependencies] vmm-sys-util = "0.11" @@ -62,4 +71,8 @@ prefetch-rate-limit = ["leaky-bucket"] [package.metadata.docs.rs] all-features = true -targets = ["x86_64-unknown-linux-gnu", "aarch64-unknown-linux-gnu", "aarch64-apple-darwin"] +targets = [ + "x86_64-unknown-linux-gnu", + "aarch64-unknown-linux-gnu", + "aarch64-apple-darwin", +] diff --git a/upgrade/Cargo.toml b/upgrade/Cargo.toml new file mode 100644 index 00000000000..ee78c92cabc --- /dev/null +++ b/upgrade/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "nydus-upgrade" +version = "0.1.0" +description = "Nydus Daemon Upgrade" +authors = ["The Nydus Developers"] +license = "Apache-2.0" +homepage = "https://nydus.dev/" +repository = "https://github.com/dragonflyoss/image-service" +edition = "2021" + +[dependencies] +sendfd = "0.4.3" +snapshot = { git = "https://github.com/firecracker-microvm/firecracker", tag = "v1.4.1" } +thiserror = "1" +versionize_derive = "0.1.6" +versionize = "0.1.10" diff --git a/upgrade/src/backend/mod.rs b/upgrade/src/backend/mod.rs new file mode 100644 index 00000000000..6e85e4469db --- /dev/null +++ b/upgrade/src/backend/mod.rs @@ -0,0 +1,93 @@ +use std::{io, os::fd::RawFd}; + +pub mod unix_domain_socket; + +#[derive(thiserror::Error, Debug)] +pub enum StorageBackendErr { + #[error("failed to create UnixStream, {0}")] + CreateUnixStream(io::Error), + #[error("failed to send fd over UnixStream, {0}")] + SendFd(io::Error), + #[error("failed to receive fd over UnixStream, {0}")] + RecvFd(io::Error), + #[error("no enough fds")] + NoEnoughFds, +} + +pub type Result = std::result::Result; + +/// StorageBackend trait is used to save and restore the fuse fds and fuse state data for online upgrade. +pub trait StorageBackend: Send + Sync { + /// Save the fuse fds and fuse state data for online upgrade. + /// Returns the length of bytes of fuse state data. + fn save(&mut self, fds: &[RawFd], data: &[u8]) -> Result; + + /// Restore the fuse fds and fuse state data for online upgrade. + /// Returns the length of bytes of fuse state data and the length of fds. + fn restore(&mut self, fds: &mut Vec, data: &mut Vec) -> Result<(usize, usize)>; +} + +#[cfg(test)] +mod test { + + #[test] + fn test_storage_backend() { + use std::os::fd::RawFd; + + use crate::backend::{Result, StorageBackend}; + + #[derive(Default)] + struct TestStorageBackend { + fds: Vec, + data: Vec, + } + + impl StorageBackend for TestStorageBackend { + fn save(&mut self, fds: &[RawFd], data: &[u8]) -> Result { + self.fds = Vec::new(); + fds.iter().for_each(|fd| self.fds.push(*fd)); + + self.data = vec![0u8; data.len()]; + self.data.clone_from_slice(data); + + Ok(self.data.len()) + } + + fn restore( + &mut self, + fds: &mut Vec, + data: &mut Vec, + ) -> Result<(usize, usize)> { + fds.truncate(self.fds.len()); + fds.copy_from_slice(&self.fds); + + data.truncate(self.data.len()); + data.copy_from_slice(&self.data); + + Ok((data.len(), fds.len())) + } + } + + const FDS_LEN: usize = 10; + const DATA_LEN: usize = 5; + let fds = [5 as RawFd; FDS_LEN]; + let data: [u8; DATA_LEN] = [7, 8, 9, 10, 12]; + + let mut backend: Box = Box::new(TestStorageBackend::default()); + let saved_data_len = backend.save(&fds, &data).unwrap(); + assert_eq!(saved_data_len, DATA_LEN); + + let mut restored_fds = vec![0 as RawFd; 100]; + let mut restored_data = vec![0 as u8; 100]; + let (restored_data_len, restored_fds_len) = backend + .restore(&mut restored_fds, &mut restored_data) + .unwrap(); + assert_eq!(restored_data_len, DATA_LEN); + assert_eq!(restored_fds_len, FDS_LEN); + + restored_data.truncate(restored_data_len); + restored_fds.truncate(restored_fds_len); + assert_eq!(restored_data, data); + assert_eq!(restored_fds, fds); + } +} diff --git a/upgrade/src/backend/unix_domain_socket.rs b/upgrade/src/backend/unix_domain_socket.rs new file mode 100644 index 00000000000..18f727b93d1 --- /dev/null +++ b/upgrade/src/backend/unix_domain_socket.rs @@ -0,0 +1,52 @@ +// Copyright 2023 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + os::{fd::RawFd, unix::net::UnixStream}, + path::PathBuf, +}; + +use sendfd::{RecvWithFd, SendWithFd}; + +use super::{Result, StorageBackend, StorageBackendErr}; + +pub struct UdsStorageBackend { + socket_path: PathBuf, +} + +impl UdsStorageBackend { + pub fn new(socket_path: PathBuf) -> Self { + UdsStorageBackend { socket_path } + } +} + +impl StorageBackend for UdsStorageBackend { + fn save(&mut self, fds: &[RawFd], data: &[u8]) -> Result { + if fds.len() < 1 { + return Err(StorageBackendErr::NoEnoughFds); + } + + let socket = + UnixStream::connect(&self.socket_path).map_err(StorageBackendErr::CreateUnixStream)?; + let len = socket + .send_with_fd(data, fds) + .map_err(StorageBackendErr::SendFd)?; + + Ok(len) + } + + fn restore(&mut self, fds: &mut Vec, data: &mut Vec) -> Result<(usize, usize)> { + let socket = + UnixStream::connect(&self.socket_path).map_err(StorageBackendErr::CreateUnixStream)?; + let len_pair = socket + .recv_with_fd(data, fds) + .map_err(StorageBackendErr::RecvFd)?; + + if fds.len() < 1 { + return Err(StorageBackendErr::NoEnoughFds); + } + + Ok(len_pair) + } +} diff --git a/upgrade/src/lib.rs b/upgrade/src/lib.rs new file mode 100644 index 00000000000..2436b487aaf --- /dev/null +++ b/upgrade/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright 2023 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +pub mod backend; +pub mod persist; diff --git a/upgrade/src/persist.rs b/upgrade/src/persist.rs new file mode 100644 index 00000000000..adced9b1d52 --- /dev/null +++ b/upgrade/src/persist.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::io::{Error as IoError, ErrorKind, Result}; +use std::{any::TypeId, collections::HashMap}; + +use snapshot::Snapshot; +use versionize::{VersionMap, Versionize}; + +/// A list of versions. +type Versions = Vec>; + +/// A trait for snapshotting. +/// This trait is used to save and restore a struct +/// which implements `versionize::Versionize`. +pub trait Snapshotter: Versionize + Sized { + /// Returns a list of versions. + fn get_versions() -> Versions; + + /// Returns a `VersionMap` with the versions defined by `get_versions`. + fn new_version_map() -> VersionMap { + let mut version_map = VersionMap::new(); + for (idx, map) in Self::get_versions().into_iter().enumerate() { + if idx > 0 { + version_map.new_version(); + } + for (type_id, version) in map { + version_map.set_type_version(type_id, version); + } + } + version_map + } + + /// Returns a new `Snapshot` with the versions defined by `get_versions`. + fn new_snapshot() -> Snapshot { + let vm = Self::new_version_map(); + let target_version = vm.latest_version(); + Snapshot::new(vm, target_version) + } + + /// Saves the struct to a `Vec`. + fn save(&self) -> Result> { + let mut buf = Vec::new(); + let mut snapshot = Self::new_snapshot(); + snapshot.save(&mut buf, self).map_err(|e| { + IoError::new( + ErrorKind::Other, + format!("Failed to save snapshot: {:?}", e), + ) + })?; + + Ok(buf) + } + + /// Restores the struct from a `Vec`. + fn restore(buf: &mut Vec) -> Result { + Snapshot::load(&mut buf.as_slice(), buf.len(), Self::new_version_map()).map_err(|e| { + IoError::new( + ErrorKind::Other, + format!("Failed to load snapshot: {:?}", e), + ) + }) + } +} From d9853f174ccf8ff8f26b55d76db870d6a0aea3b2 Mon Sep 17 00:00:00 2001 From: Xin Yin Date: Sun, 8 Oct 2023 09:50:22 +0800 Subject: [PATCH 2/4] feat: support takeover for fscache refine the UpgradeManager, make it can also support store status for fscache daemon. And make the takeover feature applies to both fuse and fscache mode. Signed-off-by: Xin Yin --- Cargo.lock | 31 +- Cargo.toml | 2 +- api/src/config.rs | 2 +- clib/Cargo.toml | 2 +- rafs/Cargo.toml | 2 +- rafs/src/fs.rs | 15 +- service/Cargo.toml | 4 +- service/src/daemon.rs | 7 +- service/src/fs_cache.rs | 46 ++- service/src/fs_service.rs | 27 +- service/src/fusedev.rs | 23 ++ service/src/singleton.rs | 137 +++++-- service/src/upgrade.rs | 457 +++++++++++++++------- src/bin/nydusd/api_server_glue.rs | 7 + src/bin/nydusd/main.rs | 4 +- storage/Cargo.toml | 2 +- upgrade/src/backend/mod.rs | 39 +- upgrade/src/backend/unix_domain_socket.rs | 18 +- 18 files changed, 573 insertions(+), 252 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 186b6583d7b..83915ecb19e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,6 +393,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "dbs-snapshot" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf0c82c41414e93c765f9fb84d5531a836f6a0855c5e1c7fcf6cd2c20c85f6e" +dependencies = [ + "displaydoc", + "libc", + "thiserror", + "versionize", + "versionize_derive", +] + [[package]] name = "dbs-uhttp" version = "0.3.2" @@ -414,6 +427,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "displaydoc" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "encoding_rs" version = "0.8.31" @@ -538,19 +562,20 @@ dependencies = [ [[package]] name = "fuse-backend-rs" -version = "0.10.5" -source = "git+https://github.com/loheagn/fuse-backend-rs.git?branch=vfs-persist#be366463fa66e2e675e32e79b3a5500e58f153d1" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e5a63a89f40ec26a0a1434e89de3f4ee939a920eae15d641053ee09ee6ed44b" dependencies = [ "arc-swap", "bitflags 1.3.2", "caps", "core-foundation-sys", + "dbs-snapshot", "lazy_static", "libc", "log", "mio", "nix", - "snapshot", "versionize", "versionize_derive", "vhost", diff --git a/Cargo.toml b/Cargo.toml index ce257ac9474..69de7eae25d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ path = "src/lib.rs" anyhow = "1" clap = { version = "4.0.18", features = ["derive", "cargo"] } flexi_logger = { version = "0.25", features = ["compress"] } -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" hex = "0.4.3" hyper = "0.14.11" hyperlocal = "0.8.0" diff --git a/api/src/config.rs b/api/src/config.rs index 5f8ce55c183..0dd77fd0d8f 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -1070,7 +1070,7 @@ pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap"; pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob"; /// Configuration information for a cached blob. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct BlobCacheEntry { /// Type of blob object, bootstrap or data blob. #[serde(rename = "type")] diff --git a/clib/Cargo.toml b/clib/Cargo.toml index fe5892f8dd2..cbc3bb73718 100644 --- a/clib/Cargo.toml +++ b/clib/Cargo.toml @@ -15,7 +15,7 @@ crate-type = ["cdylib", "staticlib"] [dependencies] libc = "0.2.137" log = "0.4.17" -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" nydus-api = { version = "0.3", path = "../api" } nydus-rafs = { version = "0.3.1", path = "../rafs" } nydus-storage = { version = "0.6.3", path = "../storage" } diff --git a/rafs/Cargo.toml b/rafs/Cargo.toml index acfcf5fd2b7..2e3d9633a66 100644 --- a/rafs/Cargo.toml +++ b/rafs/Cargo.toml @@ -19,7 +19,7 @@ nix = "0.24" serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.53" vm-memory = "0.10" -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" thiserror = "1" nydus-api = { version = "0.3", path = "../api" } diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index c053d2e0c81..15a1848c4ce 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -502,6 +502,15 @@ impl FileSystem for Rafs { type Inode = Inode; type Handle = Handle; + #[cfg(target_os = "macos")] + fn init(&self, _opts: FsOptions) -> Result { + Ok( + // These fuse features are supported by rafs by default. + FsOptions::ASYNC_READ | FsOptions::BIG_WRITES | FsOptions::ATOMIC_O_TRUNC, + ) + } + + #[cfg(target_os = "linux")] fn init(&self, _opts: FsOptions) -> Result { Ok( // These fuse features are supported by rafs by default. @@ -823,7 +832,10 @@ impl FileSystem for Rafs { _flags: u32, ) -> Result<(Option, OpenOptions)> { // Cache dir since we are readonly - Ok((None, OpenOptions::CACHE_DIR | OpenOptions::KEEP_CACHE)) + #[cfg(target_os = "macos")] + return Ok((None, OpenOptions::KEEP_CACHE)); + #[cfg(target_os = "linux")] + return Ok((None, OpenOptions::CACHE_DIR | OpenOptions::KEEP_CACHE)); } fn releasedir(&self, _ctx: &Context, _inode: u64, _flags: u32, _handle: u64) -> Result<()> { @@ -1033,6 +1045,7 @@ mod tests { assert_eq!(ent.inode, 0); assert_eq!(ent.generation, 0); assert_eq!(ent.attr_flags, 0); + #[cfg(target_os = "linux")] rafs.init(FsOptions::ASYNC_DIO).unwrap(); rafs.open(&Context::default(), Inode::default(), 0, 0) .unwrap(); diff --git a/service/Cargo.toml b/service/Cargo.toml index 5714960b029..3695655923f 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -12,9 +12,7 @@ resolver = "2" [dependencies] bytes = { version = "1", optional = true } dbs-allocator = { version = "0.1.1", optional = true } -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist", features = [ - "persist", -] } +fuse-backend-rs = { version = "^0.11.0", features = ["persist"] } libc = "0.2" log = "0.4.8" mio = { version = "0.8", features = ["os-poll", "os-ext"] } diff --git a/service/src/daemon.rs b/service/src/daemon.rs index 911cb416a9f..3170f41cab9 100644 --- a/service/src/daemon.rs +++ b/service/src/daemon.rs @@ -14,7 +14,7 @@ use std::ops::Deref; use std::process; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::thread::{Builder, JoinHandle}; use mio::{Events, Poll, Token, Waker}; @@ -23,6 +23,7 @@ use rust_fsm::*; use serde::{self, Serialize}; use crate::fs_service::{FsBackendCollection, FsService}; +use crate::upgrade::UpgradeManager; use crate::{BlobCacheMgr, Error, Result}; /// Nydus daemon working states. @@ -170,6 +171,10 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync { self.on_event(DaemonStateMachineInput::Start) } + fn upgrade_mgr(&self) -> Option> { + None + } + // For backward compatibility. /// Set default filesystem service object. fn get_default_fs_service(&self) -> Option> { diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index f750c1a649a..8f011733372 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -266,6 +266,7 @@ impl FsCacheHandler { tag: Option<&str>, blob_cache_mgr: Arc, threads: usize, + restore_file: Option<&File>, ) -> Result { info!( "fscache: create FsCacheHandler with dir {}, tag {}", @@ -273,15 +274,18 @@ impl FsCacheHandler { tag.unwrap_or("") ); - let mut file = OpenOptions::new() - .write(true) - .read(true) - .create(false) - .open(path) - .map_err(|e| { - error!("Failed to open cachefiles device {}. {}", path, e); - e - })?; + let mut file = match restore_file { + None => OpenOptions::new() + .write(true) + .read(true) + .create(false) + .open(path) + .map_err(|e| { + error!("Failed to open cachefiles device {}. {}", path, e); + e + })?, + Some(f) => f.try_clone()?, + }; let poller = Poll::new().map_err(|_e| eother!("fscache: failed to create poller for service"))?; @@ -296,15 +300,21 @@ impl FsCacheHandler { ) .map_err(|_e| eother!("fscache: failed to register fd for service"))?; - // Initialize the fscache session - file.write_all(format!("dir {}", dir).as_bytes())?; - file.flush()?; - if let Some(tag) = tag { - file.write_all(format!("tag {}", tag).as_bytes())?; + if restore_file.is_none() { + // Initialize the fscache session + file.write_all(format!("dir {}", dir).as_bytes())?; + file.flush()?; + if let Some(tag) = tag { + file.write_all(format!("tag {}", tag).as_bytes())?; + file.flush()?; + } + file.write_all(b"bind ondemand")?; + file.flush()?; + } else { + // send restore cmd, if we are in restore process + file.write_all(b"restore")?; file.flush()?; } - file.write_all(b"bind ondemand")?; - file.flush()?; let state = FsCacheState { id_to_object_map: Default::default(), @@ -379,6 +389,10 @@ impl FsCacheHandler { } } + pub fn get_file(&self) -> &File { + &self.file + } + /// Read and process all requests from fscache driver until no data available. fn handle_requests(&self, buf: &mut [u8]) -> Result<()> { loop { diff --git a/service/src/fs_service.rs b/service/src/fs_service.rs index 94c45699cd9..a834f331206 100644 --- a/service/src/fs_service.rs +++ b/service/src/fs_service.rs @@ -120,14 +120,8 @@ pub trait FsService: Send + Sync { ); } if let Some(mut mgr_guard) = self.upgrade_mgr() { - if let Err(e) = mgr_guard.add_mounts_state(cmd, index) { - warn!( - "failed to add filesystem instance to upgrade manager, {}", - e - ); - mgr_guard.disable_upgrade(); - warn!("disable online upgrade due to inconsistent status!!!"); - } + mgr_guard.add_mounts_state(cmd, index); + mgr_guard.save_vfs_stat(self.get_vfs())?; } Ok(()) @@ -161,14 +155,7 @@ pub trait FsService: Send + Sync { } // Update mounts opaque from UpgradeManager if let Some(mut mgr_guard) = self.upgrade_mgr() { - if let Err(e) = mgr_guard.update_mounts_state(cmd) { - warn!( - "failed to update filesystem instance to upgrade manager, {}", - e - ); - mgr_guard.disable_upgrade(); - warn!("disable online upgrade due to inconsistent status!!!"); - } + mgr_guard.update_mounts_state(cmd)?; } Ok(()) @@ -195,12 +182,8 @@ pub trait FsService: Send + Sync { self.backend_collection().del(&cmd.mountpoint); if let Some(mut mgr_guard) = self.upgrade_mgr() { // Remove mount opaque from UpgradeManager - if let Err(e) = mgr_guard.remove_mounts_state(cmd) { - warn!( - "failed to remove filesystem instance from upgrade manager, {}", - e - ); - } + mgr_guard.remove_mounts_state(cmd); + mgr_guard.save_vfs_stat(self.get_vfs())?; } debug!("try to gc unused blobs"); diff --git a/service/src/fusedev.rs b/service/src/fusedev.rs index d36341cb0ef..5dc9da598fa 100644 --- a/service/src/fusedev.rs +++ b/service/src/fusedev.rs @@ -605,6 +605,7 @@ pub fn create_fuse_daemon( error!("service session mount error: {}", &e); eother!(e) })?; + daemon .on_event(DaemonStateMachineInput::Mount) .map_err(|e| eother!(e))?; @@ -615,12 +616,34 @@ pub fn create_fuse_daemon( .service .conn .store(calc_fuse_conn(mnt)?, Ordering::Relaxed); + + if let Some(f) = daemon.service.session.lock().unwrap().get_fuse_file() { + if let Some(mut m) = daemon.service.upgrade_mgr() { + m.hold_file(f).map_err(|e| { + error!("Failed to hold fusedev fd, {:?}", e); + eother!(e) + })?; + m.save_fuse_cid(daemon.service.conn.load(Ordering::Acquire)); + } + } } Ok(daemon) } /// Create vfs backend with rafs or passthrough as the fuse filesystem driver + +#[cfg(target_os = "macos")] +pub fn create_vfs_backend( + _fs_type: FsBackendType, + _is_fuse: bool, + _hybrid_mode: bool, +) -> Result> { + let vfs = fuse_backend_rs::api::Vfs::new(fuse_backend_rs::api::VfsOptions::default()); + Ok(Arc::new(vfs)) +} + +#[cfg(target_os = "linux")] pub fn create_vfs_backend( fs_type: FsBackendType, is_fuse: bool, diff --git a/service/src/singleton.rs b/service/src/singleton.rs index 75c1f61a8f3..6546c93d3ad 100644 --- a/service/src/singleton.rs +++ b/service/src/singleton.rs @@ -5,9 +5,14 @@ //! Nydus daemon to host multiple services, including fscache and fusedev. use std::any::Any; +use std::fs::metadata; +#[cfg(target_os = "linux")] +use std::fs::{File, OpenOptions}; +use std::os::unix::net::UnixStream; +use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use mio::Waker; use nydus_api::config::BlobCacheList; @@ -18,10 +23,13 @@ use crate::daemon::{ NydusDaemon, }; use crate::fs_service::FsService; +#[cfg(target_os = "linux")] +use crate::upgrade; +use crate::upgrade::UpgradeManager; use crate::{BlobCacheMgr, Error, Result}; #[allow(dead_code)] -struct ServiceController { +pub struct ServiceController { bti: BuildTimeInfo, id: Option, request_sender: Arc>>, @@ -31,7 +39,7 @@ struct ServiceController { waker: Arc, blob_cache_mgr: Arc, - + upgrade_mgr: Option>, fscache_enabled: AtomicBool, #[cfg(target_os = "linux")] fscache: Mutex>>, @@ -97,11 +105,12 @@ impl ServiceController { #[cfg(target_os = "linux")] impl ServiceController { - fn initialize_fscache_service( + pub fn initialize_fscache_service( &self, tag: Option<&str>, - threads: Option<&str>, + threads: usize, path: &str, + file: Option<&File>, ) -> std::io::Result<()> { // Validate --fscache option value is an existing directory. let p = match std::path::Path::new(&path).canonicalize() { @@ -125,12 +134,6 @@ impl ServiceController { } }; - let threads = if let Some(threads_value) = threads { - crate::validate_threads_configuration(threads_value).map_err(|err| einval!(err))? - } else { - 1usize - }; - info!( "Create fscache instance at {} with tag {}, {} working threads", p, @@ -143,12 +146,22 @@ impl ServiceController { tag, self.blob_cache_mgr.clone(), threads, + file, )?; *self.fscache.lock().unwrap() = Some(Arc::new(fscache)); self.fscache_enabled.store(true, Ordering::Release); Ok(()) } + + fn get_fscache_file(&self) -> std::io::Result { + if let Some(fscache) = self.fscache.lock().unwrap().clone() { + let f = fscache.get_file().try_clone()?; + Ok(f) + } else { + Err(einval!("fscache file not init")) + } + } } impl NydusDaemon for ServiceController { @@ -191,11 +204,21 @@ impl NydusDaemon for ServiceController { } fn save(&self) -> Result<()> { - Err(Error::Unsupported) + #[cfg(target_os = "linux")] + return upgrade::fscache_upgrade::save(self); + #[cfg(target_os = "macos")] + return Ok(()); } fn restore(&self) -> Result<()> { - Err(Error::Unsupported) + #[cfg(target_os = "linux")] + return upgrade::fscache_upgrade::restore(self); + #[cfg(target_os = "macos")] + return Ok(()); + } + + fn upgrade_mgr(&self) -> Option> { + self.upgrade_mgr.as_ref().map(|mgr| mgr.lock().unwrap()) } fn get_default_fs_service(&self) -> Option> { @@ -235,6 +258,36 @@ impl DaemonStateMachineSubscriber for ServiceController { } } +#[allow(unused)] +fn is_sock_residual(sock: impl AsRef) -> bool { + if metadata(&sock).is_ok() { + return UnixStream::connect(&sock).is_err(); + } + + false +} +/// When nydusd starts, it checks that whether a previous nydusd died unexpected by: +/// 1. Checking whether /dev/cachefiles can be opened. +/// 2. Checking whether the API socket exists and the connection can established or not. +fn is_crashed(_sock: &impl AsRef) -> Result { + #[cfg(target_os = "linux")] + if let Err(_e) = OpenOptions::new() + .write(true) + .read(true) + .create(false) + .open("/dev/cachefiles") + { + warn!("cachefiles devfd can not open, the devfd may hold by supervisor or another daemon."); + if is_sock_residual(_sock) { + warn!("A previous daemon crashed! Try to failover later."); + return Ok(true); + } + warn!("another daemon is running, will exit!"); + return Err(Error::Unsupported); + } + Ok(false) +} + /// Create and start a Nydus daemon to host fscache and fusedev services. #[allow(clippy::too_many_arguments, unused)] pub fn create_daemon( @@ -246,40 +299,67 @@ pub fn create_daemon( config: Option, bti: BuildTimeInfo, waker: Arc, + api_sock: Option>, + upgrade: bool, ) -> std::io::Result> { let (to_sm, from_client) = channel::(); let (to_client, from_sm) = channel::>(); + let upgrade_mgr = supervisor + .as_ref() + .map(|s| Mutex::new(UpgradeManager::new(s.to_string().into()))); + let service_controller = ServiceController { bti, id, request_sender: Arc::new(Mutex::new(to_sm)), result_receiver: Mutex::new(from_sm), - state: Default::default(), + state: AtomicI32::new(DaemonState::INIT as i32), supervisor, waker, blob_cache_mgr: Arc::new(BlobCacheMgr::new()), - + upgrade_mgr, fscache_enabled: AtomicBool::new(false), #[cfg(target_os = "linux")] fscache: Mutex::new(None), }; service_controller.initialize_blob_cache(&config)?; - #[cfg(target_os = "linux")] - if let Some(path) = fscache { - service_controller.initialize_fscache_service(tag, threads, path)?; - } let daemon = Arc::new(service_controller); let machine = DaemonStateMachineContext::new(daemon.clone(), from_client, to_client); machine.kick_state_machine()?; - daemon - .on_event(DaemonStateMachineInput::Mount) - .map_err(|e| eother!(e))?; - daemon - .on_event(DaemonStateMachineInput::Start) - .map_err(|e| eother!(e))?; + + // Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper + // finding a victim is not necessary. + if (api_sock.as_ref().is_some() && !upgrade && !is_crashed(api_sock.as_ref().unwrap())?) + || api_sock.is_none() + { + #[cfg(target_os = "linux")] + if let Some(path) = fscache { + let threads = if let Some(threads_value) = threads { + crate::validate_threads_configuration(threads_value).map_err(|err| einval!(err))? + } else { + 1usize + }; + daemon.initialize_fscache_service(tag, threads, path, None)?; + let f = daemon.get_fscache_file()?; + if let Some(mut mgr_guard) = daemon.upgrade_mgr() { + mgr_guard.hold_file(&f).map_err(|e| { + error!("Failed to hold fscache fd, {:?}", e); + eother!(e) + })?; + mgr_guard.save_fscache_states(threads, path.to_string()); + } + } + + daemon + .on_event(DaemonStateMachineInput::Mount) + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Start) + .map_err(|e| eother!(e))?; + } Ok(daemon) } @@ -316,6 +396,7 @@ mod tests { supervisor: Some(String::from("supervisor")), waker: Arc::new(waker), blob_cache_mgr: Arc::new(BlobCacheMgr::new()), + upgrade_mgr: None, fscache_enabled: AtomicBool::new(false), fscache: Mutex::new(None), } @@ -326,19 +407,19 @@ mod tests { let service_controller = create_service_controller(); assert!(service_controller - .initialize_fscache_service(None, None, "some path") + .initialize_fscache_service(None, 1, "some path", None) .is_err()); let mut p = std::env::current_dir().unwrap(); p.push("Cargo.toml"); assert!(service_controller - .initialize_fscache_service(None, None, p.to_str().unwrap()) + .initialize_fscache_service(None, 1, p.to_str().unwrap(), None) .is_err()); let tmp_dir = TempDir::new().unwrap(); let dir = tmp_dir.as_path().to_str().unwrap(); assert!(service_controller - .initialize_fscache_service(None, Some("1"), dir) + .initialize_fscache_service(None, 1, dir, None) .is_ok()); assert_eq!(service_controller.id(), Some(String::from("id"))); diff --git a/service/src/upgrade.rs b/service/src/upgrade.rs index afae3506450..1d99231f648 100644 --- a/service/src/upgrade.rs +++ b/service/src/upgrade.rs @@ -7,16 +7,20 @@ use std::any::TypeId; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; +use std::fs::File; use std::io; -use std::os::fd::RawFd; +use std::os::fd::{AsRawFd, FromRawFd}; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; +use nydus_api::BlobCacheEntry; use nydus_upgrade::backend::unix_domain_socket::UdsStorageBackend; use nydus_upgrade::backend::{StorageBackend, StorageBackendErr}; use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd}; use crate::{Error, Result}; +use fuse_backend_rs::api::Vfs; +use versionize::{VersionMap, Versionize, VersionizeResult}; +use versionize_derive::Versionize; /// Error codes related to upgrade manager. #[derive(thiserror::Error, Debug)] @@ -26,11 +30,14 @@ pub enum UpgradeMgrError { #[error("failed to save/restore data via the backend, {0}")] StorageBackendError(StorageBackendErr), - #[error("failed to serialize, {0}")] Serialize(io::Error), #[error("failed to deserialize, {0}")] Deserialize(io::Error), + #[error("failed to clone file, {0}")] + CloneFile(io::Error), + #[error("failed to initialize fscache driver, {0}")] + InitializeFscache(io::Error), } impl From for Error { @@ -68,55 +75,121 @@ impl TryFrom<&String> for FailoverPolicy { } } -const MAX_STATE_DATA_LENGTH: usize = 1024 * 32; +struct FscacheState { + blob_entry_map: HashMap, + threads: usize, + path: String, +} + +#[derive(Versionize, Clone)] +struct MountStateWrapper { + cmd: FsBackendMountCmd, + vfs_index: u8, +} + +struct FusedevState { + fs_mount_cmd_map: HashMap, + vfs_state_data: Vec, + fuse_conn_id: u64, +} /// Online upgrade manager. pub struct UpgradeManager { - // backend_mount_cmd_map records the mount command of each backend filesystem. - // the structure is: mountpoint -> (FsBackendMountCmd, vfs_index) - backend_mount_cmd_map: HashMap, - fds: Vec, + fscache_deamon_stat: FscacheState, + fuse_deamon_stat: FusedevState, + file: Option, backend: Box, - - disabled: AtomicBool, } impl UpgradeManager { /// Create a new instance of [UpgradeManager]. pub fn new(socket_path: PathBuf) -> Self { UpgradeManager { - backend_mount_cmd_map: HashMap::new(), + fscache_deamon_stat: FscacheState { + blob_entry_map: HashMap::new(), + threads: 1, + path: "".to_string(), + }, + fuse_deamon_stat: FusedevState { + fs_mount_cmd_map: HashMap::new(), + vfs_state_data: vec![], + fuse_conn_id: 0, + }, + file: None, backend: Box::new(UdsStorageBackend::new(socket_path)), - fds: Vec::new(), - disabled: AtomicBool::new(false), } } + pub fn add_blob_entry_state(&mut self, entry: BlobCacheEntry) { + let mut blob_state_id = entry.domain_id.to_string(); + blob_state_id.push('/'); + blob_state_id.push_str(&entry.blob_id); + + self.fscache_deamon_stat + .blob_entry_map + .insert(blob_state_id, entry); + } - /// Add a filesystem instance into the upgrade manager. - pub fn add_mounts_state(&mut self, cmd: FsBackendMountCmd, vfs_index: u8) -> Result<()> { - if self.disabled.load(Ordering::Acquire) { - return Err(Error::Unsupported); + pub fn remove_blob_entry_state(&mut self, domain_id: &str, blob_id: &str) { + let mut blob_state_id = domain_id.to_string(); + blob_state_id.push('/'); + // for no shared domain mode, snapshotter will call unbind without blob_id + if !blob_id.is_empty() { + blob_state_id.push_str(blob_id); + } else { + blob_state_id.push_str(domain_id); } - let cmd_map = &mut self.backend_mount_cmd_map; - if cmd_map.contains_key(&cmd.mountpoint) { - return Err(Error::AlreadyExists); + if self + .fscache_deamon_stat + .blob_entry_map + .remove(&blob_state_id) + .is_none() + { + warn!("blob {}: state was not saved before!", blob_state_id) } + } + + pub fn save_fscache_states(&mut self, threads: usize, path: String) { + self.fscache_deamon_stat.path = path; + self.fscache_deamon_stat.threads = threads; + } - cmd_map.insert(cmd.mountpoint.clone(), (cmd, vfs_index)); + pub fn save_fuse_cid(&mut self, fuse_conn_id: u64) { + self.fuse_deamon_stat.fuse_conn_id = fuse_conn_id; + } + + pub fn save_vfs_stat(&mut self, vfs: &Vfs) -> Result<()> { + let vfs_state_data = vfs.save_to_bytes().map_err(|e| { + let io_err = io::Error::new( + io::ErrorKind::Other, + format!("Failed to save vfs state: {:?}", e), + ); + UpgradeMgrError::Serialize(io_err) + })?; + self.fuse_deamon_stat.vfs_state_data = vfs_state_data; Ok(()) } + /// Add a filesystem instance into the upgrade manager. + pub fn add_mounts_state(&mut self, cmd: FsBackendMountCmd, vfs_index: u8) { + let cmd_wrapper = MountStateWrapper { + cmd: cmd.clone(), + vfs_index, + }; + self.fuse_deamon_stat + .fs_mount_cmd_map + .insert(cmd.mountpoint, cmd_wrapper); + } + /// Update a filesystem instance in the upgrade manager. pub fn update_mounts_state(&mut self, cmd: FsBackendMountCmd) -> Result<()> { - if self.disabled.load(Ordering::Acquire) { - return Err(Error::Unsupported); - } - - let cmd_map = &mut self.backend_mount_cmd_map; - match cmd_map.get_mut(&cmd.mountpoint) { - Some(cmd_with_vfs_idx) => { - cmd_with_vfs_idx.0 = cmd; + match self + .fuse_deamon_stat + .fs_mount_cmd_map + .get_mut(&cmd.mountpoint) + { + Some(cmd_wrapper) => { + cmd_wrapper.cmd = cmd; Ok(()) } None => Err(Error::NotFound), @@ -124,140 +197,254 @@ impl UpgradeManager { } /// Remove a filesystem instance from the upgrade manager. - pub fn remove_mounts_state(&mut self, cmd: FsBackendUmountCmd) -> Result<()> { - if self.disabled.load(Ordering::Acquire) { - return Err(Error::Unsupported); - } - - let cmd_map = &mut self.backend_mount_cmd_map; - match cmd_map.get_mut(&cmd.mountpoint) { - Some(_) => { - cmd_map.remove(&cmd.mountpoint); - Ok(()) - } - None => Err(Error::NotFound), + pub fn remove_mounts_state(&mut self, cmd: FsBackendUmountCmd) { + if self + .fuse_deamon_stat + .fs_mount_cmd_map + .remove(&cmd.mountpoint) + .is_none() + { + warn!( + "mount state for {}: state was not saved before!", + cmd.mountpoint + ) } } - /// Disable online upgrade capability. - pub fn disable_upgrade(&mut self) { - self.disabled.store(true, Ordering::Release); - } + /// Save the fd and daemon state data for online upgrade. + fn save(&mut self, data: &[u8]) -> Result<()> { + let mut fds = Vec::new(); + if let Some(ref f) = self.file { + fds.push(f.as_raw_fd()) + } - /// Save the fuse fds and fuse state data for online upgrade. - fn save(&mut self, data: &Vec) -> Result<()> { self.backend - .save(&self.fds, data) + .save(&fds, data) .map_err(UpgradeMgrError::StorageBackendError)?; Ok(()) } - /// Restore the fuse fds and fuse state data for online upgrade. + /// Restore the fd and daemon state data for online upgrade. fn restore(&mut self) -> Result> { - let mut fds = vec![0 as RawFd; 8]; - let mut state_data = vec![0u8; MAX_STATE_DATA_LENGTH]; - let (state_data_len, fds_len) = self + let (fds, state_data) = self .backend - .restore(&mut fds, &mut state_data) + .restore() .map_err(UpgradeMgrError::StorageBackendError)?; - fds.truncate(fds_len); - state_data.truncate(state_data_len); - - self.fds = fds; + if fds.len() != 1 { + warn!("Too many fds {}, we may not correctly handle it", fds.len()); + } + self.file = Some(unsafe { File::from_raw_fd(fds[0]) }); Ok(state_data) } - fn add_fd(&mut self, fd: RawFd) { - self.fds.push(fd); + pub fn hold_file(&mut self, fd: &File) -> Result<()> { + let f = fd.try_clone().map_err(UpgradeMgrError::CloneFile)?; + self.file = Some(f); + + Ok(()) } -} -/// Online upgrade utilities for FUSE daemon. -pub mod fusedev_upgrade { - use std::fs::File; - use std::os::fd::{FromRawFd, RawFd}; - use std::os::unix::io::AsRawFd; - use std::sync::atomic::Ordering; + pub fn return_file(&mut self) -> Option { + if let Some(ref f) = self.file { + // Basically, this can hardly fail. + f.try_clone() + .map_err(|e| { + error!("Clone file error, {}", e); + e + }) + .ok() + } else { + warn!("No file can be returned"); + None + } + } +} +#[cfg(target_os = "linux")] +/// Online upgrade utilities for fscache daemon. +pub mod fscache_upgrade { + use std::convert::TryFrom; + use std::str::FromStr; + use super::*; + use crate::daemon::NydusDaemon; + use crate::singleton::ServiceController; use nydus_upgrade::persist::Snapshotter; use versionize::{VersionMap, Versionize, VersionizeResult}; use versionize_derive::Versionize; + #[derive(Versionize, Clone)] + pub struct BlobCacheEntryState { + json_str: String, + } + + #[derive(Versionize, Clone, Default)] + struct FscacheBackendState { + blob_entry_list: Vec<(String, BlobCacheEntryState)>, + threads: usize, + path: String, + } + + impl Snapshotter for FscacheBackendState { + fn get_versions() -> Vec> { + vec![ + // version 1 + HashMap::from([(FscacheBackendState::type_id(), 1)]), + // more versions for the future + ] + } + } + + impl TryFrom<&FscacheBackendState> for FscacheState { + type Error = std::io::Error; + fn try_from(backend_stat: &FscacheBackendState) -> std::result::Result { + let mut map = HashMap::new(); + for (id, entry_stat) in &backend_stat.blob_entry_list { + let entry = BlobCacheEntry::from_str(&entry_stat.json_str)?; + map.insert(id.to_string(), entry); + } + Ok(FscacheState { + blob_entry_map: map, + threads: backend_stat.threads, + path: backend_stat.path.clone(), + }) + } + } + + impl TryFrom<&FscacheState> for FscacheBackendState { + type Error = std::io::Error; + fn try_from(stat: &FscacheState) -> std::result::Result { + let mut list = Vec::new(); + for (id, entry) in &stat.blob_entry_map { + let entry_stat = serde_json::to_string(&entry)?; + list.push(( + id.to_string(), + BlobCacheEntryState { + json_str: entry_stat, + }, + )); + } + Ok(FscacheBackendState { + blob_entry_list: list, + threads: stat.threads, + path: stat.path.clone(), + }) + } + } + + pub fn save(daemon: &ServiceController) -> Result<()> { + if let Some(mut mgr) = daemon.upgrade_mgr() { + let backend_stat = FscacheBackendState::try_from(&mgr.fscache_deamon_stat) + .map_err(UpgradeMgrError::Serialize)?; + let stat = backend_stat.save().map_err(UpgradeMgrError::Serialize)?; + mgr.save(&stat)?; + } + Ok(()) + } + + pub fn restore(daemon: &ServiceController) -> Result<()> { + if let Some(mut mgr) = daemon.upgrade_mgr() { + if let Some(blob_mgr) = daemon.get_blob_cache_mgr() { + // restore the mgr state via the backend in the mgr + let mut state_data = mgr.restore()?; + + let backend_stat = FscacheBackendState::restore(&mut state_data) + .map_err(UpgradeMgrError::Deserialize)?; + + let stat = + FscacheState::try_from(&backend_stat).map_err(UpgradeMgrError::Deserialize)?; + // restore blob entry + stat.blob_entry_map + .iter() + .try_for_each(|(_, entry)| -> Result<()> { + blob_mgr + .add_blob_entry(entry) + .map_err(UpgradeMgrError::Deserialize)?; + Ok(()) + })?; + + // init fscache daemon with restored fd + if let Some(f) = mgr.return_file() { + daemon + .initialize_fscache_service(None, stat.threads, &stat.path, Some(&f)) + .map_err(UpgradeMgrError::InitializeFscache)?; + } + + //restore upgrade manager fscache stat + mgr.fscache_deamon_stat = stat; + return Ok(()); + } + } + Err(UpgradeMgrError::MissingSupervisorPath.into()) + } +} + +/// Online upgrade utilities for FUSE daemon. +pub mod fusedev_upgrade { + use std::sync::atomic::Ordering; + use super::*; use crate::daemon::NydusDaemon; use crate::fusedev::{FusedevDaemon, FusedevFsService}; + use nydus_upgrade::persist::Snapshotter; + use versionize::{VersionMap, Versionize, VersionizeResult}; + use versionize_derive::Versionize; #[derive(Versionize, Clone, Default)] - struct FusedevState { - backend_fs_mount_cmd_list: Vec<(FsBackendMountCmd, u8)>, + struct FusedevBackendState { + fs_mount_cmd_list: Vec<(String, MountStateWrapper)>, vfs_state_data: Vec, fuse_conn_id: u64, } - impl Snapshotter for FusedevState { + impl Snapshotter for FusedevBackendState { fn get_versions() -> Vec> { vec![ // version 1 - HashMap::from([(FusedevState::type_id(), 1)]), + HashMap::from([(FusedevBackendState::type_id(), 1)]), // more versions for the future ] } } + impl From<&FusedevBackendState> for FusedevState { + fn from(backend_stat: &FusedevBackendState) -> Self { + let mut map = HashMap::new(); + for (mp, mw) in &backend_stat.fs_mount_cmd_list { + map.insert(mp.to_string(), mw.clone()); + } + FusedevState { + fs_mount_cmd_map: map, + vfs_state_data: backend_stat.vfs_state_data.clone(), + fuse_conn_id: backend_stat.fuse_conn_id, + } + } + } + + impl From<&FusedevState> for FusedevBackendState { + fn from(stat: &FusedevState) -> Self { + let mut list = Vec::new(); + for (mp, mw) in &stat.fs_mount_cmd_map { + list.push((mp.to_string(), mw.clone())); + } + FusedevBackendState { + fs_mount_cmd_list: list, + vfs_state_data: stat.vfs_state_data.clone(), + fuse_conn_id: stat.fuse_conn_id, + } + } + } + /// Save state information for a FUSE daemon. pub fn save(daemon: &FusedevDaemon) -> Result<()> { let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?; - if !svc.get_vfs().initialized() { return Err(Error::NotReady); } let mut mgr = svc.upgrade_mgr().unwrap(); + let backend_stat = FusedevBackendState::from(&mgr.fuse_deamon_stat); - // set fd - let fd = svc - .as_ref() - .as_any() - .downcast_ref::() - .unwrap() - .session - .lock() - .unwrap() - .get_fuse_file() - .unwrap() - .as_raw_fd(); - mgr.add_fd(fd); - - // save vfs state - let vfs_state_data = svc.get_vfs().save_to_bytes().map_err(|e| { - let io_err = io::Error::new( - io::ErrorKind::Other, - format!("Failed to save vfs state: {:?}", e), - ); - UpgradeMgrError::Serialize(io_err) - })?; - - let backend_fs_mount_cmd_list = mgr - .backend_mount_cmd_map - .iter() - .map(|(_, (cmd, vfs_idx))| (cmd.clone(), *vfs_idx)) - .collect(); - - let fuse_conn_id = svc - .as_any() - .downcast_ref::() - .unwrap() - .conn - .load(Ordering::Acquire); - - let state = FusedevState { - backend_fs_mount_cmd_list, - vfs_state_data, - fuse_conn_id, - }; - let state = state.save().map_err(UpgradeMgrError::Serialize)?; - - // save the mgr state via the backend in the mgr + let state = backend_stat.save().map_err(UpgradeMgrError::Serialize)?; mgr.save(&state)?; Ok(()) @@ -276,17 +463,10 @@ pub mod fusedev_upgrade { // restore the mgr state via the backend in the mgr let mut state_data = mgr.restore()?; - let mut state = - FusedevState::restore(&mut state_data).map_err(UpgradeMgrError::Deserialize)?; + let backend_state = + FusedevBackendState::restore(&mut state_data).map_err(UpgradeMgrError::Deserialize)?; - // restore the backend fs mount cmd map - state - .backend_fs_mount_cmd_list - .iter() - .for_each(|(cmd, vfs_idx)| { - mgr.backend_mount_cmd_map - .insert(cmd.mountpoint.clone(), (cmd.clone(), *vfs_idx)); - }); + let mut state = FusedevState::from(&backend_state); // restore the fuse daemon svc.as_any() @@ -296,27 +476,32 @@ pub mod fusedev_upgrade { .store(state.fuse_conn_id, Ordering::Release); // restore fuse fd - svc.as_any() - .downcast_ref::() - .unwrap() - .session - .lock() - .unwrap() - .set_fuse_file(unsafe { File::from_raw_fd(mgr.fds[0] as RawFd) }); + if let Some(f) = mgr.return_file() { + svc.as_any() + .downcast_ref::() + .unwrap() + .session + .lock() + .unwrap() + .set_fuse_file(f); + } // restore vfs svc.get_vfs() .restore_from_bytes(&mut state.vfs_state_data)?; state - .backend_fs_mount_cmd_list + .fs_mount_cmd_map .iter() - .try_for_each(|(cmd, vfs_idx)| -> Result<()> { - svc.restore_mount(cmd, *vfs_idx)?; + .try_for_each(|(_, mount_wrapper)| -> Result<()> { + svc.restore_mount(&mount_wrapper.cmd, mount_wrapper.vfs_index)?; // as we are in upgrade stage and obtain the lock, `unwrap` is safe here - mgr.add_mounts_state(cmd.clone(), *vfs_idx).unwrap(); + //mgr.add_mounts_state(cmd.clone(), *vfs_idx); Ok(()) })?; + //restore upgrade manager fuse stat + mgr.fuse_deamon_stat = state; + Ok(()) } } diff --git a/src/bin/nydusd/api_server_glue.rs b/src/bin/nydusd/api_server_glue.rs index dea9fa7cd92..b1cd72f7c8f 100644 --- a/src/bin/nydusd/api_server_glue.rs +++ b/src/bin/nydusd/api_server_glue.rs @@ -280,6 +280,10 @@ impl ApiServer { e )))) } else { + if let Some(mut mgr_guard) = self.get_daemon_object()?.upgrade_mgr() { + // if started with supervisor, save the blob entry state + mgr_guard.add_blob_entry_state(entry.clone()); + } Ok(ApiResponsePayload::Empty) } } @@ -296,6 +300,9 @@ impl ApiServer { e )))) } else { + if let Some(mut mgr_guard) = self.get_daemon_object()?.upgrade_mgr() { + mgr_guard.remove_blob_entry_state(¶m.domain_id, ¶m.blob_id); + } Ok(ApiResponsePayload::Empty) } } diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 357882d0d7f..e06693e7a55 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -545,7 +545,7 @@ fn process_fs_service( fn process_singleton_arguments( subargs: &SubCmdArgs, - _apisock: Option<&str>, + apisock: Option<&str>, bti: BuildTimeInfo, ) -> Result<()> { let id = subargs.value_of("id").map(|id| id.to_string()); @@ -572,6 +572,8 @@ fn process_singleton_arguments( config, bti, DAEMON_CONTROLLER.alloc_waker(), + apisock, + subargs.is_present("upgrade"), ) .map_err(|e| { error!("Failed to start singleton daemon: {}", e); diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 78373874156..84cfd50495f 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -42,7 +42,7 @@ tokio = { version = "1.19.0", features = [ ] } url = { version = "2.1.1", optional = true } vm-memory = "0.10" -fuse-backend-rs = { git = "https://github.com/loheagn/fuse-backend-rs.git", branch = "vfs-persist" } +fuse-backend-rs = "^0.11.0" gpt = { version = "3.1.0", optional = true } nydus-api = { version = "0.3", path = "../api" } diff --git a/upgrade/src/backend/mod.rs b/upgrade/src/backend/mod.rs index 6e85e4469db..8fad8e20510 100644 --- a/upgrade/src/backend/mod.rs +++ b/upgrade/src/backend/mod.rs @@ -16,15 +16,15 @@ pub enum StorageBackendErr { pub type Result = std::result::Result; -/// StorageBackend trait is used to save and restore the fuse fds and fuse state data for online upgrade. +/// StorageBackend trait is used to save and restore the dev fds and daemon state data for online upgrade. pub trait StorageBackend: Send + Sync { - /// Save the fuse fds and fuse state data for online upgrade. - /// Returns the length of bytes of fuse state data. + /// Save the dev fds and daemon state data for online upgrade. + /// Returns the length of bytes of state data. fn save(&mut self, fds: &[RawFd], data: &[u8]) -> Result; - /// Restore the fuse fds and fuse state data for online upgrade. - /// Returns the length of bytes of fuse state data and the length of fds. - fn restore(&mut self, fds: &mut Vec, data: &mut Vec) -> Result<(usize, usize)>; + /// Restore the dev fds and daemon state data for online upgrade. + /// Returns the fds and state data + fn restore(&mut self) -> Result<(Vec, Vec)>; } #[cfg(test)] @@ -53,18 +53,8 @@ mod test { Ok(self.data.len()) } - fn restore( - &mut self, - fds: &mut Vec, - data: &mut Vec, - ) -> Result<(usize, usize)> { - fds.truncate(self.fds.len()); - fds.copy_from_slice(&self.fds); - - data.truncate(self.data.len()); - data.copy_from_slice(&self.data); - - Ok((data.len(), fds.len())) + fn restore(&mut self) -> Result<(Vec, Vec)> { + Ok((self.fds.clone(), self.data.clone())) } } @@ -73,20 +63,11 @@ mod test { let fds = [5 as RawFd; FDS_LEN]; let data: [u8; DATA_LEN] = [7, 8, 9, 10, 12]; - let mut backend: Box = Box::new(TestStorageBackend::default()); + let mut backend: Box = Box::::default(); let saved_data_len = backend.save(&fds, &data).unwrap(); assert_eq!(saved_data_len, DATA_LEN); - let mut restored_fds = vec![0 as RawFd; 100]; - let mut restored_data = vec![0 as u8; 100]; - let (restored_data_len, restored_fds_len) = backend - .restore(&mut restored_fds, &mut restored_data) - .unwrap(); - assert_eq!(restored_data_len, DATA_LEN); - assert_eq!(restored_fds_len, FDS_LEN); - - restored_data.truncate(restored_data_len); - restored_fds.truncate(restored_fds_len); + let (restored_fds, restored_data) = backend.restore().unwrap(); assert_eq!(restored_data, data); assert_eq!(restored_fds, fds); } diff --git a/upgrade/src/backend/unix_domain_socket.rs b/upgrade/src/backend/unix_domain_socket.rs index 18f727b93d1..c6bd63ca35e 100644 --- a/upgrade/src/backend/unix_domain_socket.rs +++ b/upgrade/src/backend/unix_domain_socket.rs @@ -21,9 +21,11 @@ impl UdsStorageBackend { } } +const MAX_STATE_DATA_LENGTH: usize = 1024 * 32; + impl StorageBackend for UdsStorageBackend { fn save(&mut self, fds: &[RawFd], data: &[u8]) -> Result { - if fds.len() < 1 { + if fds.is_empty() { return Err(StorageBackendErr::NoEnoughFds); } @@ -36,17 +38,19 @@ impl StorageBackend for UdsStorageBackend { Ok(len) } - fn restore(&mut self, fds: &mut Vec, data: &mut Vec) -> Result<(usize, usize)> { + fn restore(&mut self) -> Result<(Vec, Vec)> { + let mut data = vec![0u8; MAX_STATE_DATA_LENGTH]; + let mut fds = vec![0i32; 16]; let socket = UnixStream::connect(&self.socket_path).map_err(StorageBackendErr::CreateUnixStream)?; - let len_pair = socket - .recv_with_fd(data, fds) + let (_, fds_cnt) = socket + .recv_with_fd(data.as_mut_slice(), fds.as_mut_slice()) .map_err(StorageBackendErr::RecvFd)?; - if fds.len() < 1 { + if fds.is_empty() { return Err(StorageBackendErr::NoEnoughFds); } - - Ok(len_pair) + fds.truncate(fds_cnt); + Ok((fds, data)) } } From 2fddd1c66741c3964d46dc80f88c4a1d909b3bf2 Mon Sep 17 00:00:00 2001 From: Xin Yin Date: Thu, 7 Dec 2023 12:10:54 +0800 Subject: [PATCH 3/4] service: add unit test for upgrade mananger Signed-off-by: Xin Yin --- service/src/upgrade.rs | 125 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/service/src/upgrade.rs b/service/src/upgrade.rs index 1d99231f648..dcfc5117823 100644 --- a/service/src/upgrade.rs +++ b/service/src/upgrade.rs @@ -278,7 +278,7 @@ pub mod fscache_upgrade { } #[derive(Versionize, Clone, Default)] - struct FscacheBackendState { + pub struct FscacheBackendState { blob_entry_list: Vec<(String, BlobCacheEntryState)>, threads: usize, path: String, @@ -390,7 +390,7 @@ pub mod fusedev_upgrade { use versionize_derive::Versionize; #[derive(Versionize, Clone, Default)] - struct FusedevBackendState { + pub struct FusedevBackendState { fs_mount_cmd_list: Vec<(String, MountStateWrapper)>, vfs_state_data: Vec, fuse_conn_id: u64, @@ -509,6 +509,13 @@ pub mod fusedev_upgrade { #[cfg(test)] mod tests { use super::*; + use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd}; + #[cfg(target_os = "linux")] + use crate::upgrade::fscache_upgrade::FscacheBackendState; + use crate::upgrade::fusedev_upgrade::FusedevBackendState; + use crate::FsBackendType; + use nydus_upgrade::persist::Snapshotter; + use vmm_sys_util::tempfile::TempFile; #[test] fn test_failover_policy() { @@ -542,4 +549,118 @@ mod tests { assert!(FailoverPolicy::try_from(s).is_err()); } } + + #[test] + #[cfg(target_os = "linux")] + fn test_upgrade_manager_for_fscache() { + let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into()); + + let content = r#"{ + "type": "bootstrap", + "id": "blob1", + "config": { + "id": "cache1", + "backend_type": "localfs", + "backend_config": {}, + "cache_type": "fscache", + "cache_config": {}, + "metadata_path": "/tmp/metadata1" + }, + "domain_id": "domain1" + }"#; + let entry: BlobCacheEntry = serde_json::from_str(content).unwrap(); + upgrade_mgr.save_fscache_states(4, "/tmp/fscache_dir".to_string()); + assert_eq!(upgrade_mgr.fscache_deamon_stat.threads, 4); + assert_eq!(upgrade_mgr.fscache_deamon_stat.path, "/tmp/fscache_dir"); + + upgrade_mgr.add_blob_entry_state(entry); + assert!(upgrade_mgr + .fscache_deamon_stat + .blob_entry_map + .get("domain1/blob1") + .is_some()); + + assert!(FscacheBackendState::try_from(&upgrade_mgr.fscache_deamon_stat).is_ok()); + + let backend_stat = FscacheBackendState::try_from(&upgrade_mgr.fscache_deamon_stat).unwrap(); + assert!(backend_stat.save().is_ok()); + assert!(FscacheState::try_from(&backend_stat).is_ok()); + let stat = FscacheState::try_from(&backend_stat).unwrap(); + assert_eq!(stat.path, upgrade_mgr.fscache_deamon_stat.path); + assert_eq!(stat.threads, upgrade_mgr.fscache_deamon_stat.threads); + assert!(stat.blob_entry_map.get("domain1/blob1").is_some()); + + upgrade_mgr.remove_blob_entry_state("domain1", "blob1"); + assert!(upgrade_mgr + .fscache_deamon_stat + .blob_entry_map + .get("domain1/blob1") + .is_none()); + } + + #[test] + fn test_upgrade_manager_for_fusedev() { + let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into()); + + let config = r#"{ + "version": 2, + "id": "factory1", + "backend": { + "type": "localfs", + "localfs": { + "dir": "/tmp/nydus" + } + }, + "cache": { + "type": "fscache", + "fscache": { + "work_dir": "/tmp/nydus" + } + }, + "metadata_path": "/tmp/nydus/bootstrap1" + }"#; + let cmd = FsBackendMountCmd { + fs_type: FsBackendType::Rafs, + config: config.to_string(), + mountpoint: "testmonutount".to_string(), + source: "testsource".to_string(), + prefetch_files: Some(vec!["testfile".to_string()]), + }; + + upgrade_mgr.save_fuse_cid(10); + assert_eq!(upgrade_mgr.fuse_deamon_stat.fuse_conn_id, 10); + upgrade_mgr.add_mounts_state(cmd.clone(), 5); + assert!(upgrade_mgr + .fuse_deamon_stat + .fs_mount_cmd_map + .get("testmonutount") + .is_some()); + assert!(upgrade_mgr.update_mounts_state(cmd).is_ok()); + + let backend_stat = FusedevBackendState::from(&upgrade_mgr.fuse_deamon_stat); + assert!(backend_stat.save().is_ok()); + + let stat = FusedevState::from(&backend_stat); + assert_eq!(stat.fuse_conn_id, upgrade_mgr.fuse_deamon_stat.fuse_conn_id); + assert!(stat.fs_mount_cmd_map.get("testmonutount").is_some()); + + let umount_cmd: FsBackendUmountCmd = FsBackendUmountCmd { + mountpoint: "testmonutount".to_string(), + }; + upgrade_mgr.remove_mounts_state(umount_cmd); + assert!(upgrade_mgr + .fuse_deamon_stat + .fs_mount_cmd_map + .get("testmonutount") + .is_none()); + } + + #[test] + fn test_upgrade_manager_hold_fd() { + let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into()); + + let temp = TempFile::new().unwrap().into_file(); + assert!(upgrade_mgr.hold_file(&temp).is_ok()); + assert!(upgrade_mgr.return_file().is_some()); + } } From a038f89581308220d2e3e6973d925bbd62cee4d6 Mon Sep 17 00:00:00 2001 From: Xin Yin Date: Thu, 7 Dec 2023 15:03:31 +0800 Subject: [PATCH 4/4] upgrade: change to use dbs_snapshot crate Signed-off-by: Xin Yin --- Cargo.lock | 13 +------------ service/src/fs_service.rs | 2 +- service/src/upgrade.rs | 8 ++++---- upgrade/Cargo.toml | 2 +- upgrade/src/backend/mod.rs | 4 ++++ upgrade/src/persist.rs | 14 ++++++++------ 6 files changed, 19 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83915ecb19e..71d502afb95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1391,8 +1391,8 @@ dependencies = [ name = "nydus-upgrade" version = "0.1.0" dependencies = [ + "dbs-snapshot", "sendfd", - "snapshot", "thiserror", "versionize", "versionize_derive", @@ -1951,17 +1951,6 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" -[[package]] -name = "snapshot" -version = "0.1.0" -source = "git+https://github.com/firecracker-microvm/firecracker?tag=v1.4.1#02dd0328589c59ae11b7ad13eaf412410f8e72e1" -dependencies = [ - "libc", - "thiserror", - "versionize", - "versionize_derive", -] - [[package]] name = "socket2" version = "0.4.4" diff --git a/service/src/fs_service.rs b/service/src/fs_service.rs index a834f331206..a1ee0a6b683 100644 --- a/service/src/fs_service.rs +++ b/service/src/fs_service.rs @@ -29,7 +29,7 @@ use crate::upgrade::UpgradeManager; use crate::{Error, FsBackendDescriptor, FsBackendType, Result}; /// Request structure to mount a filesystem instance. -#[derive(Clone, Versionize)] +#[derive(Clone, Versionize, Debug)] pub struct FsBackendMountCmd { /// Filesystem type. pub fs_type: FsBackendType, diff --git a/service/src/upgrade.rs b/service/src/upgrade.rs index dcfc5117823..821ea4031af 100644 --- a/service/src/upgrade.rs +++ b/service/src/upgrade.rs @@ -81,7 +81,7 @@ struct FscacheState { path: String, } -#[derive(Versionize, Clone)] +#[derive(Versionize, Clone, Debug)] struct MountStateWrapper { cmd: FsBackendMountCmd, vfs_index: u8, @@ -272,12 +272,12 @@ pub mod fscache_upgrade { use versionize::{VersionMap, Versionize, VersionizeResult}; use versionize_derive::Versionize; - #[derive(Versionize, Clone)] + #[derive(Versionize, Clone, Debug)] pub struct BlobCacheEntryState { json_str: String, } - #[derive(Versionize, Clone, Default)] + #[derive(Versionize, Clone, Default, Debug)] pub struct FscacheBackendState { blob_entry_list: Vec<(String, BlobCacheEntryState)>, threads: usize, @@ -389,7 +389,7 @@ pub mod fusedev_upgrade { use versionize::{VersionMap, Versionize, VersionizeResult}; use versionize_derive::Versionize; - #[derive(Versionize, Clone, Default)] + #[derive(Versionize, Clone, Default, Debug)] pub struct FusedevBackendState { fs_mount_cmd_list: Vec<(String, MountStateWrapper)>, vfs_state_data: Vec, diff --git a/upgrade/Cargo.toml b/upgrade/Cargo.toml index ee78c92cabc..9b13902cae5 100644 --- a/upgrade/Cargo.toml +++ b/upgrade/Cargo.toml @@ -10,7 +10,7 @@ edition = "2021" [dependencies] sendfd = "0.4.3" -snapshot = { git = "https://github.com/firecracker-microvm/firecracker", tag = "v1.4.1" } +dbs-snapshot = "1.5.0" thiserror = "1" versionize_derive = "0.1.6" versionize = "0.1.10" diff --git a/upgrade/src/backend/mod.rs b/upgrade/src/backend/mod.rs index 8fad8e20510..2135ba2b04c 100644 --- a/upgrade/src/backend/mod.rs +++ b/upgrade/src/backend/mod.rs @@ -1,3 +1,7 @@ +// Copyright 2023 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + use std::{io, os::fd::RawFd}; pub mod unix_domain_socket; diff --git a/upgrade/src/persist.rs b/upgrade/src/persist.rs index adced9b1d52..8a86ecb18fe 100644 --- a/upgrade/src/persist.rs +++ b/upgrade/src/persist.rs @@ -2,10 +2,11 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::fmt::Debug; use std::io::{Error as IoError, ErrorKind, Result}; use std::{any::TypeId, collections::HashMap}; -use snapshot::Snapshot; +use dbs_snapshot::Snapshot; use versionize::{VersionMap, Versionize}; /// A list of versions. @@ -14,7 +15,7 @@ type Versions = Vec>; /// A trait for snapshotting. /// This trait is used to save and restore a struct /// which implements `versionize::Versionize`. -pub trait Snapshotter: Versionize + Sized { +pub trait Snapshotter: Versionize + Sized + Debug { /// Returns a list of versions. fn get_versions() -> Versions; @@ -55,11 +56,12 @@ pub trait Snapshotter: Versionize + Sized { /// Restores the struct from a `Vec`. fn restore(buf: &mut Vec) -> Result { - Snapshot::load(&mut buf.as_slice(), buf.len(), Self::new_version_map()).map_err(|e| { - IoError::new( + match Snapshot::load(&mut buf.as_slice(), buf.len(), Self::new_version_map()) { + Ok((o, _)) => Ok(o), + Err(e) => Err(IoError::new( ErrorKind::Other, format!("Failed to load snapshot: {:?}", e), - ) - }) + )), + } } }