From f77d93bd4cb09010e45a8f3cab839fb755ab6277 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Thu, 22 Jun 2023 13:49:02 +0800 Subject: [PATCH] tarfs: refine tarfs overall design and implementation details 1) move loopdev setup from Prepare()/Merge() into Mount(), preparing for support of nodev mode in addition to blockdev. 2) store tar file in the global cache directory, mount tarfs at "mnt" instead of "tarfs", to keep consistence with fusedev/fscache drivers. 3) use tempfile/rename mechanism to easy error recover 4) use lock to avoid some possible race conditions 5) use mode 0750 instead of 0755 for new directories 6) associate a Manager with filesystems managed by blockdev Signed-off-by: Jiang Liu --- pkg/filesystem/config.go | 20 +- pkg/filesystem/fs.go | 49 +++-- pkg/filesystem/tarfs_adaptor.go | 12 +- pkg/label/label.go | 15 +- pkg/manager/manager.go | 23 +-- pkg/metrics/serve.go | 4 +- pkg/tarfs/tarfs.go | 343 ++++++++++++++++++++------------ snapshot/process.go | 92 +++++---- snapshot/snapshot.go | 154 +++++++++----- 9 files changed, 444 insertions(+), 268 deletions(-) diff --git a/pkg/filesystem/config.go b/pkg/filesystem/config.go index 6aecfdca77..4f2fb668bd 100644 --- a/pkg/filesystem/config.go +++ b/pkg/filesystem/config.go @@ -29,18 +29,18 @@ func WithNydusImageBinaryPath(p string) NewFSOpt { func WithManager(pm *manager.Manager) NewFSOpt { return func(fs *Filesystem) error { - if pm == nil { - return errors.New("process manager cannot be nil") + if pm != nil { + switch pm.FsDriver { + case config.FsDriverBlockdev: + fs.blockdevManager = pm + case config.FsDriverFscache: + fs.fscacheManager = pm + case config.FsDriverFusedev: + fs.fusedevManager = pm + } + fs.enabledManagers = append(fs.enabledManagers, pm) } - if pm.FsDriver == config.FsDriverFusedev { - fs.fusedevManager = pm - } else if pm.FsDriver == config.FsDriverFscache { - fs.fscacheManager = pm - } - - fs.enabledManagers = append(fs.enabledManagers, pm) - return nil } } diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index c4992bdafd..60ef16d41d 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -21,6 +21,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/snapshots" + "github.com/containerd/containerd/snapshots/storage" snpkg "github.com/containerd/containerd/pkg/snapshotters" "github.com/containerd/nydus-snapshotter/config" @@ -29,6 +30,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/daemon" "github.com/containerd/nydus-snapshotter/pkg/daemon/types" "github.com/containerd/nydus-snapshotter/pkg/errdefs" + "github.com/containerd/nydus-snapshotter/pkg/label" "github.com/containerd/nydus-snapshotter/pkg/manager" "github.com/containerd/nydus-snapshotter/pkg/referrer" "github.com/containerd/nydus-snapshotter/pkg/signature" @@ -212,21 +214,19 @@ func (fs *Filesystem) WaitUntilReady(snapshotID string) error { // Mount will be called when containerd snapshotter prepare remote snapshotter // this method will fork nydus daemon and manage it in the internal store, and indexed by snapshotID // It must set up all necessary resources during Mount procedure and revoke any step if necessary. -func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, isTarfs bool) (err error) { +func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, s *storage.Snapshot) (err error) { + // Do not create RAFS instance in case of nodev. + if !fs.DaemonBacked() { + return nil + } + fsDriver := config.GetFsDriver() - if isTarfs { + if label.IsTarfsDataLayer(labels) { fsDriver = config.FsDriverBlockdev - } else if !fs.DaemonBacked() { - fsDriver = config.FsDriverNodev } isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev - // Do not create RAFS instance in case of nodev. - if fsDriver == config.FsDriverNodev { - return nil - } - var imageID string imageID, ok := labels[snpkg.TargetRefLabel] if !ok { @@ -258,7 +258,7 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, isTarfs }() fsManager, err := fs.getManager(fsDriver) - if err != nil && fsDriver != config.FsDriverBlockdev { + if err != nil { return errors.Wrapf(err, "get filesystem manager for snapshot %s", snapshotID) } @@ -347,19 +347,17 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, isTarfs return errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID) } case config.FsDriverBlockdev: - mountPoint := path.Join(rafs.GetSnapshotDir(), "tarfs") - err = fs.tarfsMgr.MountTarErofs(snapshotID, mountPoint) + err = fs.tarfsMgr.MountTarErofs(snapshotID, s, rafs) if err != nil { return errors.Wrapf(err, "mount tarfs for snapshot %s", snapshotID) } - rafs.SetMountpoint(mountPoint) + default: + return errors.Errorf("unknown filesystem driver %s for snapshot %s", fsDriver, snapshotID) } // Persist it after associate instance after all the states are calculated. - if fsDriver == config.FsDriverFscache || fsDriver == config.FsDriverFusedev { - if err := fsManager.NewInstance(rafs); err != nil { - return errors.Wrapf(err, "create instance %s", snapshotID) - } + if err := fsManager.NewInstance(rafs); err != nil { + return errors.Wrapf(err, "create instance %s", snapshotID) } return nil @@ -373,12 +371,18 @@ func (fs *Filesystem) Umount(ctx context.Context, snapshotID string) error { } fsDriver := instance.GetFsDriver() + if fsDriver == config.FsDriverNodev { + return nil + } fsManager, err := fs.getManager(fsDriver) - if err != nil && fsDriver != config.FsDriverBlockdev { + if err != nil { return errors.Wrapf(err, "get manager for filesystem instance %s", instance.DaemonID) } - if fsDriver == config.FsDriverFscache || fsDriver == config.FsDriverFusedev { + switch fsDriver { + case config.FsDriverFscache: + fallthrough + case config.FsDriverFusedev: daemon, err := fs.getDaemonByRafs(instance) if err != nil { log.L.Debugf("snapshot %s has no associated nydusd", snapshotID) @@ -398,10 +402,15 @@ func (fs *Filesystem) Umount(ctx context.Context, snapshotID string) error { return errors.Wrapf(err, "destroy daemon %s", daemon.ID()) } } - } else if fsDriver == config.FsDriverBlockdev { + case config.FsDriverBlockdev: if err := fs.tarfsMgr.UmountTarErofs(snapshotID); err != nil { return errors.Wrapf(err, "umount tar erofs on snapshot %s", snapshotID) } + if err := fsManager.RemoveInstance(snapshotID); err != nil { + return errors.Wrapf(err, "remove snapshot %s", snapshotID) + } + default: + return errors.Errorf("unknown filesystem driver %s for snapshot %s", fsDriver, snapshotID) } return nil } diff --git a/pkg/filesystem/tarfs_adaptor.go b/pkg/filesystem/tarfs_adaptor.go index af1f4dcd0f..86824e408f 100755 --- a/pkg/filesystem/tarfs_adaptor.go +++ b/pkg/filesystem/tarfs_adaptor.go @@ -20,7 +20,7 @@ func (fs *Filesystem) TarfsEnabled() bool { return fs.tarfsMgr != nil } -func (fs *Filesystem) PrepareTarfsLayer(ctx context.Context, labels map[string]string, snapshotID, storagePath string) error { +func (fs *Filesystem) PrepareTarfsLayer(ctx context.Context, labels map[string]string, snapshotID, upperDirPath string) error { ref, ok := labels[snpkg.TargetRefLabel] if !ok { return errors.Errorf("not found image reference label") @@ -50,8 +50,8 @@ func (fs *Filesystem) PrepareTarfsLayer(ctx context.Context, labels map[string]s } go func() { - if err := fs.tarfsMgr.PrepareLayer(snapshotID, ref, manifestDigest, layerDigest, storagePath); err != nil { - log.L.WithError(err).Errorf("async prepare Tarfs layer of snapshot ID %s", snapshotID) + if err := fs.tarfsMgr.PrepareLayer(snapshotID, ref, manifestDigest, layerDigest, upperDirPath); err != nil { + log.L.WithError(err).Errorf("async prepare tarfs layer of snapshot ID %s", snapshotID) } if limiter != nil { limiter.Release(1) @@ -70,9 +70,9 @@ func (fs *Filesystem) DetachTarfsLayer(snapshotID string) error { } func (fs *Filesystem) IsTarfsLayer(snapshotID string) bool { - return fs.tarfsMgr.CheckTarfsLayer(snapshotID, false) + return fs.tarfsMgr.IsTarfsLayer(snapshotID) } -func (fs *Filesystem) IsMergedTarfsLayer(snapshotID string) bool { - return fs.tarfsMgr.CheckTarfsLayer(snapshotID, true) +func (fs *Filesystem) IsMountedTarfsLayer(snapshotID string) bool { + return fs.tarfsMgr.IsMountedTarfsLayer(snapshotID) } diff --git a/pkg/label/label.go b/pkg/label/label.go index 7621275d78..ffe960be19 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -34,6 +34,8 @@ const ( NydusMetaLayer = "containerd.io/snapshot/nydus-bootstrap" // The referenced blob sha256 in format of `sha256:xxx`, set by image builders. NydusRefLayer = "containerd.io/snapshot/nydus-ref" + // A bool flag to mark the layer as a nydus tarfs, set by the snapshotter + NydusTarfsLayer = "containerd.io/snapshot/nydus-tarfs" // Annotation containing secret to pull images from registry, set by the snapshotter. NydusImagePullSecret = "containerd.io/snapshot/pullsecret" // Annotation containing username to pull images from registry, set by the snapshotter. @@ -55,6 +57,9 @@ const ( ) func IsNydusDataLayer(labels map[string]string) bool { + if labels == nil { + return false + } _, ok := labels[NydusDataLayer] return ok } @@ -67,7 +72,15 @@ func IsNydusMetaLayer(labels map[string]string) bool { return ok } -func IsTarfsHint(labels map[string]string) bool { +func IsTarfsDataLayer(labels map[string]string) bool { + if labels == nil { + return false + } + _, ok := labels[NydusTarfsLayer] + return ok +} + +func HasTarfsHint(labels map[string]string) bool { if labels == nil { return false } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2ace2fb296..2c7d63e331 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -127,9 +127,8 @@ type Manager struct { // supposed to refilled when nydus-snapshotter restarting. daemonStates *DaemonStates - monitor LivenessMonitor - // TODO: Close me - LivenessNotifier chan deathEvent + monitor LivenessMonitor + LivenessNotifier chan deathEvent // TODO: Close me RecoverPolicy config.DaemonRecoverPolicy SupervisorSet *supervisor.SupervisorsSet @@ -151,12 +150,10 @@ type Opt struct { Database *store.Database CacheDir string RecoverPolicy config.DaemonRecoverPolicy - // Nydus-snapshotter work directory - RootDir string - DaemonConfig daemonconfig.DaemonConfig - CgroupMgr *cgroup.Manager - // In order to validate daemon fs driver is consistent with the latest snapshotter boot - FsDriver string + RootDir string // Nydus-snapshotter work directory + DaemonConfig daemonconfig.DaemonConfig + CgroupMgr *cgroup.Manager + FsDriver string // In order to validate daemon fs driver is consistent with the latest snapshotter boot } func (m *Manager) doDaemonFailover(d *daemon.Daemon) { @@ -328,6 +325,10 @@ func (m *Manager) NewInstance(r *daemon.Rafs) error { return m.store.AddInstance(r) } +func (m *Manager) RemoveInstance(snapshotID string) error { + return m.store.DeleteInstance(snapshotID) +} + func (m *Manager) Lock() { m.mu.Lock() } @@ -353,10 +354,6 @@ func (m *Manager) UnsubscribeDaemonEvent(d *daemon.Daemon) error { return nil } -func (m *Manager) RemoveInstance(snapshotID string) error { - return m.store.DeleteInstance(snapshotID) -} - func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/pkg/metrics/serve.go b/pkg/metrics/serve.go index 6544270d7b..73b68511af 100644 --- a/pkg/metrics/serve.go +++ b/pkg/metrics/serve.go @@ -36,7 +36,9 @@ type Server struct { func WithProcessManager(pm *manager.Manager) ServerOpt { return func(s *Server) error { - s.managers = append(s.managers, pm) + if pm != nil { + s.managers = append(s.managers, pm) + } return nil } } diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 5a9cb84201..ddbf2ff6ef 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -13,6 +13,7 @@ import ( "io" "os" "os/exec" + "path" "path/filepath" "strings" "sync" @@ -22,6 +23,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/snapshots/storage" "github.com/containerd/nydus-snapshotter/pkg/auth" + "github.com/containerd/nydus-snapshotter/pkg/daemon" "github.com/containerd/nydus-snapshotter/pkg/label" "github.com/containerd/nydus-snapshotter/pkg/remote" "github.com/containerd/nydus-snapshotter/pkg/remote/remotes" @@ -38,6 +40,8 @@ import ( type Manager struct { snapshotMap map[string]*snapshotStatus // tarfs snapshots status, indexed by snapshot ID mutex sync.Mutex + mutexLoopDev sync.Mutex + cacheDirPath string nydusImagePath string insecure bool validateDiffID bool // whether to validate digest for uncompressed content @@ -50,34 +54,37 @@ type Manager struct { } const ( - TarfsStatusInit = 0 - TarfsStatusFormatting = 1 - TarfsStatusReady = 2 - TarfsStatusFailed = 3 + TarfsStatusInit = 0 + TarfsStatusPrepare = 1 + TarfsStatusReady = 2 + TarfsStatusFailed = 3 ) const ( MaxManifestConfigSize = 0x100000 - TarfsBlobName = "blob.tar" - TarfsLayerBootstapName = "layer_bootstrap" - TarfsMeragedBootstapName = "merged_bootstrap" + TarfsLayerBootstapName = "layer.boot" + TarfsMeragedBootstapName = "image.boot" ) var ErrEmptyBlob = errors.New("empty blob") type snapshotStatus struct { + mutex sync.Mutex status int - layerBsLoopdev *losetup.Device - mergedBsLoopdev *losetup.Device + isEmptyBlob bool + blobID string + blobTarFilePath string erofsMountPoint string - erofsMountOpts string + dataLoopdev *losetup.Device + metaLoopdev *losetup.Device wg *sync.WaitGroup cancel context.CancelFunc } -func NewManager(insecure, checkTarfsHint bool, nydusImagePath string, maxConcurrentProcess int64) *Manager { +func NewManager(insecure, checkTarfsHint bool, cacheDirPath, nydusImagePath string, maxConcurrentProcess int64) *Manager { return &Manager{ snapshotMap: map[string]*snapshotStatus{}, + cacheDirPath: cacheDirPath, nydusImagePath: nydusImagePath, insecure: insecure, validateDiffID: true, @@ -104,16 +111,16 @@ func (t *Manager) fetchImageInfo(ctx context.Context, remote *remote.Remote, ref } bytes, err := io.ReadAll(rc) if err != nil { - return errors.Wrap(err, "read manifest") + return errors.Wrap(err, "read image manifest content") } - // TODO: support docker images + // TODO: support docker v2 images var manifestOCI ocispec.Manifest if err := json.Unmarshal(bytes, &manifestOCI); err != nil { - return errors.Wrap(err, "unmarshal OCI manifest") + return errors.Wrap(err, "unmarshal OCI image manifest") } if len(manifestOCI.Layers) < 1 { - return errors.Errorf("invalid manifest") + return errors.Errorf("invalid OCI image manifest without any layer") } // fetch image config content and extract diffIDs @@ -135,11 +142,12 @@ func (t *Manager) fetchImageInfo(ctx context.Context, remote *remote.Remote, ref return errors.Wrap(err, "unmarshal image config") } if len(config.RootFS.DiffIDs) != len(manifestOCI.Layers) { - return errors.Errorf("number of diff ids unmatch manifest layers") + return errors.Errorf("number of diffIDs does not match manifest layers") } + if t.checkTarfsHint { // cache ref & tarfs hint annotation - t.tarfsHintCache.Add(ref, label.IsTarfsHint(manifestOCI.Annotations)) + t.tarfsHintCache.Add(ref, label.HasTarfsHint(manifestOCI.Annotations)) } if t.validateDiffID { // cache OCI blob digest & diff id @@ -184,17 +192,28 @@ func (t *Manager) getBlobStream(ctx context.Context, remote *remote.Remote, ref } // generate tar file and layer bootstrap, return if this blob is an empty blob -func (t *Manager) generateBootstrap(tarReader io.Reader, storagePath, snapshotID string) (emptyBlob bool, err error) { - layerBootstrap := filepath.Join(storagePath, TarfsLayerBootstapName) - blob := filepath.Join(storagePath, "layer_"+snapshotID+"_"+TarfsBlobName) +func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string) (emptyBlob bool, err error) { + snapshotImageDir := filepath.Join(upperDirPath, "image") + if err := os.MkdirAll(snapshotImageDir, 0750); err != nil { + return false, errors.Wrapf(err, "create data dir %s for tarfs snapshot", snapshotImageDir) + } + layerMetaFile := t.layerMetaFilePath(upperDirPath) + if _, err := os.Stat(layerMetaFile); err == nil { + return false, errors.Errorf("tarfs bootstrap file %s for snapshot %s already exists", layerMetaFile, snapshotID) + } + layerMetaFileTmp := layerMetaFile + ".tarfs.tmp" + defer os.Remove(layerMetaFileTmp) - tarFile, err := os.Create(blob) + layerTarFile := t.layerTarFilePath(layerBlobID) + layerTarFileTmp := layerTarFile + ".tarfs.tmp" + tarFile, err := os.Create(layerTarFileTmp) if err != nil { - return false, err + return false, errors.Wrap(err, "create temporary file to store tar stream") } defer tarFile.Close() + defer os.Remove(layerTarFileTmp) - fifoName := filepath.Join(storagePath, "layer_"+snapshotID+"_"+"tar.fifo") + fifoName := filepath.Join(upperDirPath, "layer_"+snapshotID+"_"+"tar.fifo") if err = syscall.Mkfifo(fifoName, 0644); err != nil { return false, err } @@ -215,9 +234,9 @@ func (t *Manager) generateBootstrap(tarReader io.Reader, storagePath, snapshotID options := []string{ "create", "--type", "tar-tarfs", - "--bootstrap", layerBootstrap, - "--blob-id", "layer_" + snapshotID + "_" + TarfsBlobName, - "--blob-dir", storagePath, + "--bootstrap", layerMetaFileTmp, + "--blob-id", layerBlobID, + "--blob-dir", t.cacheDirPath, fifoName, } cmd := exec.Command(t.nydusImagePath, options...) @@ -228,32 +247,32 @@ func (t *Manager) generateBootstrap(tarReader io.Reader, storagePath, snapshotID err = cmd.Run() if err != nil { log.L.Warnf("nydus image exec failed, %s", errb.String()) - return false, errors.Wrap(err, "converting tarfs layer failed") + return false, errors.Wrap(err, "converting OCIv1 layer blob to tarfs") } log.L.Debugf("nydus image output %s", outb.String()) log.L.Debugf("nydus image err %s", errb.String()) + if err := os.Rename(layerTarFileTmp, layerTarFile); err != nil { + return false, errors.Wrapf(err, "rename file %s to %s", layerTarFileTmp, layerTarFile) + } + if err := os.Rename(layerMetaFileTmp, layerMetaFile); err != nil { + return false, errors.Wrapf(err, "rename file %s to %s", layerMetaFileTmp, layerMetaFile) + } + // TODO need a more reliable way to check if this is an empty blob - if strings.Contains(outb.String(), "data blob size: 0x0") || - strings.Contains(errb.String(), "data blob size: 0x0") { + if strings.Contains(outb.String(), "data blob size: 0x0\n") || + strings.Contains(errb.String(), "data blob size: 0x0\n") { return true, nil } return false, nil } -func (t *Manager) attachLoopdev(blob string) (*losetup.Device, error) { - // losetup.Attach() is not thread-safe hold lock here - t.mutex.Lock() - defer t.mutex.Unlock() - dev, err := losetup.Attach(blob, 0, false) - return &dev, err -} - // download & uncompress an oci/docker blob, and then generate the tarfs bootstrap -func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, manifestDigest, layerDigest digest.Digest, storagePath string) (*losetup.Device, error) { +func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, manifestDigest, layerDigest digest.Digest, + layerBlobID, upperDirPath string) error { keyChain, err := auth.GetKeyChainByRef(ref, nil) if err != nil { - return nil, err + return err } remote := remote.New(keyChain, t.insecure) @@ -265,34 +284,34 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, manif defer rc.Close() ds, err := compression.DecompressStream(rc) if err != nil { - return false, errors.Wrap(err, "unpack stream") + return false, errors.Wrap(err, "unpack layer blob stream for tarfs") } defer ds.Close() + var emptyBlob bool if t.validateDiffID { diffID, err := t.getBlobDiffID(ctx, remote, ref, manifestDigest, layerDigest) if err != nil { - return false, err + return false, errors.Wrap(err, "get image layer diffID") } digester := digest.Canonical.Digester() dr := io.TeeReader(ds, digester.Hash()) - emptyBlob, err := t.generateBootstrap(dr, storagePath, snapshotID) + emptyBlob, err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath) if err != nil { - return false, err + return false, errors.Wrap(err, "generate tarfs data from image layer blob") } - log.L.Infof("prepare tarfs Layer generateBootstrap done layer %s, digest %s", snapshotID, digester.Digest()) if digester.Digest() != diffID { - return false, errors.Errorf("diff id %s not match", diffID) + return false, errors.Errorf("image layer diffID %s for tarfs does not match", diffID) } - return emptyBlob, nil + log.L.Infof("tarfs data for layer %s is ready, digest %s", snapshotID, digester.Digest()) } else { - emptyBlob, err := t.generateBootstrap(ds, storagePath, snapshotID) + emptyBlob, err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath) if err != nil { - return false, err + return false, errors.Wrap(err, "generate tarfs data from image layer blob") } - log.L.Infof("prepare tarfs Layer generateBootstrap done layer %s", snapshotID) - return emptyBlob, nil + log.L.Infof("tarfs data for layer %s is ready", snapshotID) } + return emptyBlob, nil } var emptyBlob bool @@ -301,22 +320,20 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, manif emptyBlob, err = handle() } if err != nil { - return nil, err + return err } - - // for empty blob do not need a loop device if emptyBlob { - return nil, ErrEmptyBlob + return ErrEmptyBlob } - - return t.attachLoopdev(filepath.Join(storagePath, "layer_"+snapshotID+"_"+TarfsBlobName)) + return nil } -func (t *Manager) PrepareLayer(snapshotID, ref string, manifestDigest, layerDigest digest.Digest, storagePath string) error { +func (t *Manager) PrepareLayer(snapshotID, ref string, manifestDigest, layerDigest digest.Digest, + upperDirPath string) error { t.mutex.Lock() if _, ok := t.snapshotMap[snapshotID]; ok { t.mutex.Unlock() - return errors.Errorf("snapshot %s already prapared", snapshotID) + return errors.Errorf("snapshot %s has already been prapared", snapshotID) } wg := &sync.WaitGroup{} wg.Add(1) @@ -324,145 +341,200 @@ func (t *Manager) PrepareLayer(snapshotID, ref string, manifestDigest, layerDige ctx, cancel := context.WithCancel(context.Background()) t.snapshotMap[snapshotID] = &snapshotStatus{ - status: TarfsStatusFormatting, + status: TarfsStatusPrepare, wg: wg, cancel: cancel, } t.mutex.Unlock() - loopdev, err := t.blobProcess(ctx, snapshotID, ref, manifestDigest, layerDigest, storagePath) + layerBlobID := layerDigest.Hex() + err := t.blobProcess(ctx, snapshotID, ref, manifestDigest, layerDigest, layerBlobID, upperDirPath) - st, err1 := t.getSnapshotStatus(snapshotID) + st, err1 := t.getSnapshotStatus(snapshotID, true) if err1 != nil { - return errors.Errorf("can not found snapshot status after prepare") + return errors.Errorf("can not found status object for snapshot %s after prepare", snapshotID) } + defer st.mutex.Unlock() + + st.blobID = layerBlobID + st.blobTarFilePath = t.layerTarFilePath(layerBlobID) if err != nil { if errors.Is(err, ErrEmptyBlob) { + st.isEmptyBlob = true st.status = TarfsStatusReady err = nil } else { st.status = TarfsStatusFailed } } else { + st.isEmptyBlob = false st.status = TarfsStatusReady - st.layerBsLoopdev = loopdev } + log.L.Debugf("finish converting snapshot %s to tarfs, status %d, empty blob %v", snapshotID, st.status, st.isEmptyBlob) + return err } func (t *Manager) MergeLayers(s storage.Snapshot, storageLocater func(string) string) error { - mergedBootstrap := filepath.Join(storageLocater(s.ParentIDs[0]), TarfsMeragedBootstapName) + mergedBootstrap := t.mergedMetaFilePath(storageLocater(s.ParentIDs[0])) if _, err := os.Stat(mergedBootstrap); err == nil { - log.L.Infof("tarfs snapshot %s already has merged bootstrap %s", s.ParentIDs[0], mergedBootstrap) + log.L.Debugf("tarfs snapshot %s already has merged bootstrap %s", s.ParentIDs[0], mergedBootstrap) return nil } - var mountOpts string bootstraps := []string{} // When merging bootstrap, we need to arrange layer bootstrap in order from low to high for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- { snapshotID := s.ParentIDs[idx] - err := t.waitLayerFormating(snapshotID) + err := t.waitLayerReady(snapshotID) if err != nil { - return errors.Wrapf(err, "wait layer formating err") + return errors.Wrapf(err, "wait for tarfs snapshot %s to get ready", snapshotID) } - st, err := t.getSnapshotStatus(snapshotID) + st, err := t.getSnapshotStatus(snapshotID, false) if err != nil { return err } if st.status != TarfsStatusReady { - return errors.Errorf("snapshot %s tarfs format error %d", snapshotID, st.status) + return errors.Errorf("tarfs snapshot %s is not ready, %d", snapshotID, st.status) } - bootstraps = append(bootstraps, filepath.Join(storageLocater(snapshotID), TarfsLayerBootstapName)) - // mount opt skip empty blob - if st.layerBsLoopdev != nil { - mountOpts += "device=" + st.layerBsLoopdev.Path() + "," - } + metaFilePath := t.layerMetaFilePath(storageLocater(snapshotID)) + bootstraps = append(bootstraps, metaFilePath) } + mergedBootstrapTmp := mergedBootstrap + ".tarfs.tmp" + defer os.Remove(mergedBootstrapTmp) + options := []string{ "merge", - "--bootstrap", mergedBootstrap, + "--bootstrap", mergedBootstrapTmp, } options = append(options, bootstraps...) cmd := exec.Command(t.nydusImagePath, options...) - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stdout + var errb, outb bytes.Buffer + cmd.Stderr = &errb + cmd.Stdout = &outb log.L.Debugf("nydus image command %v", options) err := cmd.Run() if err != nil { - return errors.Wrap(err, "merging tarfs layers") + return errors.Wrap(err, "merge tarfs image layers") } - loopdev, err := t.attachLoopdev(mergedBootstrap) + err = os.Rename(mergedBootstrapTmp, mergedBootstrap) if err != nil { - return errors.Wrap(err, "attach bootstrap to loop error") + return errors.Wrap(err, "rename merged bootstrap file") } - st, err := t.getSnapshotStatus(s.ParentIDs[0]) - if err != nil { - return errors.Errorf("snapshot %s not found", s.ParentIDs[0]) - } - st.mergedBsLoopdev = loopdev - st.erofsMountOpts = mountOpts - return nil } -func (t *Manager) MountTarErofs(snapshotID string, mountPoint string) error { - var devName string +func (t *Manager) attachLoopdev(blob string) (*losetup.Device, error) { + // losetup.Attach() is not thread-safe hold lock here + t.mutexLoopDev.Lock() + defer t.mutexLoopDev.Unlock() + dev, err := losetup.Attach(blob, 0, false) + return &dev, err +} - st, err := t.getSnapshotStatus(snapshotID) +func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, rafs *daemon.Rafs) error { + if s == nil { + return errors.New("snapshot object for MountTarErofs() is nil") + } + + var devices []string + // When merging bootstrap, we need to arrange layer bootstrap in order from low to high + for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- { + snapshotID := s.ParentIDs[idx] + err := t.waitLayerReady(snapshotID) + if err != nil { + return errors.Wrapf(err, "wait for tarfs conversion task") + } + + st, err := t.getSnapshotStatus(snapshotID, true) + if err != nil { + return err + } + if st.status != TarfsStatusReady { + st.mutex.Unlock() + return errors.Errorf("snapshot %s tarfs format error %d", snapshotID, st.status) + } + + if !st.isEmptyBlob { + if st.dataLoopdev == nil { + loopdev, err := t.attachLoopdev(st.blobTarFilePath) + if err != nil { + st.mutex.Unlock() + return errors.Wrapf(err, "attach layer tar file %s to loopdev", st.blobTarFilePath) + } + st.dataLoopdev = loopdev + } + devices = append(devices, "device="+st.dataLoopdev.Path()) + } + + st.mutex.Unlock() + } + mountOpts := strings.Join(devices, ",") + + st, err := t.getSnapshotStatus(snapshotID, true) if err != nil { return err } + defer st.mutex.Unlock() + + mountPoint := path.Join(rafs.GetSnapshotDir(), "mnt") if len(st.erofsMountPoint) > 0 { if st.erofsMountPoint == mountPoint { - log.L.Debugf("erofs tarfs %s already mounted", mountPoint) + log.L.Debugf("tarfs for snapshot %s has already been mounted at %s", snapshotID, mountPoint) return nil } - return errors.Errorf("erofs snapshot %s already mounted at %s", snapshotID, st.erofsMountPoint) + return errors.Errorf("tarfs for snapshot %s has already been mounted at %s", snapshotID, st.erofsMountPoint) } - if err = os.MkdirAll(mountPoint, 0755); err != nil { - return errors.Wrapf(err, "failed to create tarfs mount dir %s", mountPoint) + if st.metaLoopdev == nil { + upperDirPath := path.Join(rafs.GetSnapshotDir(), "fs") + mergedBootstrap := t.mergedMetaFilePath(upperDirPath) + loopdev, err := t.attachLoopdev(mergedBootstrap) + if err != nil { + return errors.Wrapf(err, "attach merged bootstrap %s to loopdev", mergedBootstrap) + } + st.metaLoopdev = loopdev } + devName := st.metaLoopdev.Path() - if st.mergedBsLoopdev != nil { - devName = st.mergedBsLoopdev.Path() - } else { - return errors.Errorf("snapshot %s not found boostrap loopdev error %d", snapshotID, st.status) + if err = os.MkdirAll(mountPoint, 0750); err != nil { + return errors.Wrapf(err, "create tarfs mount dir %s", mountPoint) } - err = unix.Mount(devName, mountPoint, "erofs", 0, st.erofsMountOpts) + err = unix.Mount(devName, mountPoint, "erofs", 0, mountOpts) if err != nil { - return errors.Wrapf(err, "mount erofs at %s with opts %s", mountPoint, st.erofsMountOpts) + return errors.Wrapf(err, "mount erofs at %s with opts %s", mountPoint, mountOpts) } st.erofsMountPoint = mountPoint + rafs.SetMountpoint(mountPoint) return nil } func (t *Manager) UmountTarErofs(snapshotID string) error { - st, err := t.getSnapshotStatus(snapshotID) + st, err := t.getSnapshotStatus(snapshotID, true) if err != nil { return errors.Wrapf(err, "umount a tarfs snapshot %s which is already removed", snapshotID) } + defer st.mutex.Unlock() if len(st.erofsMountPoint) > 0 { err := unix.Unmount(st.erofsMountPoint, 0) if err != nil { - return errors.Wrapf(err, "umount erofs tarfs %s failed", st.erofsMountPoint) + return errors.Wrapf(err, "umount erofs tarfs %s", st.erofsMountPoint) } } st.erofsMountPoint = "" return nil } -func (t *Manager) waitLayerFormating(snapshotID string) error { - log.L.Debugf("wait for tarfs formating snapshot %s", snapshotID) - st, err := t.getSnapshotStatus(snapshotID) +func (t *Manager) waitLayerReady(snapshotID string) error { + log.L.Debugf("wait tarfs conversion task for snapshot %s", snapshotID) + st, err := t.getSnapshotStatus(snapshotID, false) if err != nil { return err } @@ -470,20 +542,24 @@ func (t *Manager) waitLayerFormating(snapshotID string) error { return nil } +func (t *Manager) IsTarfsLayer(snapshotID string) bool { + _, err := t.getSnapshotStatus(snapshotID, false) + return err == nil +} + // check if a snapshot is tarfs layer and if mounted a erofs tarfs -func (t *Manager) CheckTarfsLayer(snapshotID string, merged bool) bool { - st, err := t.getSnapshotStatus(snapshotID) +func (t *Manager) IsMountedTarfsLayer(snapshotID string) bool { + st, err := t.getSnapshotStatus(snapshotID, true) if err != nil { return false } - if merged && len(st.erofsMountPoint) == 0 { - return false - } - return true + defer st.mutex.Unlock() + + return len(st.erofsMountPoint) != 0 } func (t *Manager) DetachLayer(snapshotID string) error { - st, err := t.getSnapshotStatus(snapshotID) + st, err := t.getSnapshotStatus(snapshotID, true) if err != nil { return os.ErrNotExist } @@ -491,36 +567,47 @@ func (t *Manager) DetachLayer(snapshotID string) error { if len(st.erofsMountPoint) > 0 { err := unix.Unmount(st.erofsMountPoint, 0) if err != nil { - return errors.Wrapf(err, "umount erofs tarfs %s failed", st.erofsMountPoint) + st.mutex.Unlock() + return errors.Wrapf(err, "umount erofs tarfs %s", st.erofsMountPoint) } } - if st.mergedBsLoopdev != nil { - err := st.mergedBsLoopdev.Detach() + if st.metaLoopdev != nil { + err := st.metaLoopdev.Detach() if err != nil { - return errors.Wrapf(err, "detach merged bootstrap loopdev for tarfs snapshot %s failed", snapshotID) + st.mutex.Unlock() + return errors.Wrapf(err, "detach merged bootstrap loopdev for tarfs snapshot %s", snapshotID) } + st.metaLoopdev = nil } - if st.layerBsLoopdev != nil { - err := st.layerBsLoopdev.Detach() + if st.dataLoopdev != nil { + err := st.dataLoopdev.Detach() if err != nil { - return errors.Wrapf(err, "detach layer bootstrap loopdev for tarfs snapshot %s failed", snapshotID) + st.mutex.Unlock() + return errors.Wrapf(err, "detach layer bootstrap loopdev for tarfs snapshot %s", snapshotID) } + st.dataLoopdev = nil } + + st.mutex.Unlock() + // TODO: check order st.cancel() t.mutex.Lock() - defer t.mutex.Unlock() delete(t.snapshotMap, snapshotID) + t.mutex.Unlock() return nil } -func (t *Manager) getSnapshotStatus(snapshotID string) (*snapshotStatus, error) { +func (t *Manager) getSnapshotStatus(snapshotID string, lock bool) (*snapshotStatus, error) { t.mutex.Lock() defer t.mutex.Unlock() st, ok := t.snapshotMap[snapshotID] if ok { + if lock { + st.mutex.Lock() + } return st, nil } return nil, errors.Errorf("not found snapshot %s", snapshotID) @@ -576,3 +663,15 @@ func (t *Manager) GetConcurrentLimiter(ref string) *semaphore.Weighted { t.processLimiterCache.Add(ref, limiter) return limiter } + +func (t *Manager) layerTarFilePath(blobID string) string { + return filepath.Join(t.cacheDirPath, blobID) +} + +func (t *Manager) layerMetaFilePath(upperDirPath string) string { + return filepath.Join(upperDirPath, "image", TarfsLayerBootstapName) +} + +func (t *Manager) mergedMetaFilePath(upperDirPath string) string { + return filepath.Join(upperDirPath, "image", TarfsMeragedBootstapName) +} diff --git a/snapshot/process.go b/snapshot/process.go index d2d8a0dc43..bade268a3d 100644 --- a/snapshot/process.go +++ b/snapshot/process.go @@ -13,7 +13,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" snpkg "github.com/containerd/containerd/pkg/snapshotters" "github.com/containerd/containerd/snapshots/storage" @@ -24,9 +23,8 @@ import ( // `storageLocater` provides a local storage for each handler to save their intermediates. // Different actions for different layer types func chooseProcessor(ctx context.Context, logger *logrus.Entry, - sn *snapshotter, s storage.Snapshot, - key, parent string, labels map[string]string, storageLocater func() string) (_ func() (bool, []mount.Mount, error), target string, err error) { - + sn *snapshotter, s storage.Snapshot, key, parent string, labels map[string]string, + storageLocater func() string) (_ func() (bool, []mount.Mount, error), target string, err error) { var handler func() (bool, []mount.Mount, error) // Handler to prepare a directory for containerd to download and unpacking layer. @@ -40,10 +38,10 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry, return true, nil, nil } - remoteHandler := func(id string, labels map[string]string, isTarfs bool) func() (bool, []mount.Mount, error) { + remoteHandler := func(id string, labels map[string]string) func() (bool, []mount.Mount, error) { return func() (bool, []mount.Mount, error) { - logger.Debugf("Found nydus meta layer id %s", id) - if err := sn.prepareRemoteSnapshot(id, labels, isTarfs); err != nil { + logger.Debugf("Prepare remote snapshot %s", id) + if err := sn.prepareRemoteSnapshot(id, labels, s); err != nil { return false, nil, err } @@ -52,12 +50,13 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry, return false, nil, err } - log.L.Infof("Nydus remote snapshot %s is ready", id) + logger.Infof("Nydus remote snapshot %s is ready", id) mounts, err := sn.remoteMounts(ctx, s, id) return false, mounts, err } } + // OCI image is also marked with "containerd.io/snapshot.ref" by Containerd target, remote := labels[label.TargetSnapshotRef] if remote { @@ -72,54 +71,58 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry, case sn.fs.CheckReferrer(ctx, labels): logger.Debugf("found referenced nydus manifest") handler = skipHandler - case sn.fs.StargzEnabled(): - // Check if the blob is format of estargz - if ok, blob := sn.fs.IsStargzDataLayer(labels); ok { - err := sn.fs.PrepareStargzMetaLayer(blob, storageLocater(), labels) + default: + if sn.fs.StargzEnabled() { + // Check if the blob is format of estargz + if ok, blob := sn.fs.IsStargzDataLayer(labels); ok { + err := sn.fs.PrepareStargzMetaLayer(blob, storageLocater(), labels) + if err != nil { + logger.Errorf("prepare stargz layer of snapshot ID %s, err: %v", s.ID, err) + } else { + logger.Debugf("found estargz data layer") + // Mark this snapshot as stargz layer since estargz image format does not + // has special annotation or media type. + labels[label.StargzLayer] = "true" + handler = skipHandler + } + } + } + if handler == nil && sn.fs.TarfsEnabled() { + err := sn.fs.PrepareTarfsLayer(ctx, labels, s.ID, sn.upperPath(s.ID)) if err != nil { - logger.Errorf("prepare stargz layer of snapshot ID %s, err: %v", s.ID, err) + logger.Warnf("snapshot ID %s can't be converted into tarfs, fallback to containerd, err: %v", s.ID, err) } else { - // Mark this snapshot as stargz layer since estargz image format does not - // has special annotation or media type. - labels[label.StargzLayer] = "true" + logger.Debugf("convert OCIv1 layer to tarfs") + labels[label.NydusTarfsLayer] = "true" + handler = skipHandler } } - case sn.fs.TarfsEnabled(): - err := sn.fs.PrepareTarfsLayer(ctx, labels, s.ID, sn.upperPath(s.ID)) - if err != nil { - logger.Debugf("snapshot ID %s can't prepare as tarfs fallback to containerd, err: %v", s.ID, err) - handler = defaultHandler - } else { - handler = skipHandler - } - default: - // OCI image is also marked with "containerd.io/snapshot.ref" by Containerd - handler = defaultHandler } } else { - // Container writable layer comes into this branch. It can't be committed within this Prepare + // Container writable layer comes into this branch. + // It should not be committed during this Prepare() operation. // Hope to find bootstrap layer and prepares to start nydusd // TODO: Trying find nydus meta layer will slow down setting up rootfs to OCI images if id, info, err := sn.findMetaLayer(ctx, key); err == nil { - logger.Infof("Prepares active snapshot %s, nydusd should start afterwards", key) - handler = remoteHandler(id, info.Labels, false) + logger.Infof("Prepare active Nydus snapshot %s", key) + handler = remoteHandler(id, info.Labels) } if handler == nil && sn.fs.ReferrerDetectEnabled() { if id, info, err := sn.findReferrerLayer(ctx, key); err == nil { - logger.Infof("found referenced nydus manifest for image: %s", info.Labels[snpkg.TargetRefLabel]) + logger.Infof("Found referenced nydus manifest for image: %s", info.Labels[snpkg.TargetRefLabel]) metaPath := path.Join(sn.snapshotDir(id), "fs", "image.boot") if err := sn.fs.TryFetchMetadata(ctx, info.Labels, metaPath); err != nil { return nil, "", errors.Wrap(err, "try fetch metadata") } - handler = remoteHandler(id, info.Labels, false) + handler = remoteHandler(id, info.Labels) } } if handler == nil && sn.fs.StargzEnabled() { // `pInfo` must be the uppermost parent layer - _, pInfo, _, err := snapshot.GetSnapshotInfo(ctx, sn.ms, parent) + id, pInfo, _, err := snapshot.GetSnapshotInfo(ctx, sn.ms, parent) if err != nil { return nil, "", errors.Wrap(err, "get parent snapshot info") } @@ -128,6 +131,8 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry, if err := sn.fs.MergeStargzMetaLayer(ctx, s); err != nil { return nil, "", errors.Wrap(err, "merge stargz meta layers") } + handler = remoteHandler(id, pInfo.Labels) + logger.Infof("Generated estargz merged meta for %s", key) } } @@ -136,18 +141,17 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry, // tarfs merged & mounted on the uppermost parent layer id, pInfo, _, err := snapshot.GetSnapshotInfo(ctx, sn.ms, parent) - if err == nil { - if sn.fs.IsTarfsLayer(id) { - err := sn.fs.MergeTarfsLayers(s, func(id string) string { return sn.upperPath(id) }) - if err != nil { - return nil, "", errors.Wrap(err, "merge tarfs layers") - } - handler = remoteHandler(id, pInfo.Labels, true) - } else { - logger.Warnf("Tarfs enable but Parent (%s) of snapshot %s is not a tarfs layer, is an untar oci or nydus snapshot?", id, s.ID) - } - } else { + switch { + case err != nil: logger.Warnf("Tarfs enable but can't get snapshot %s Parent, is an untar oci or nydus snapshot?", s.ID) + case !label.IsTarfsDataLayer(pInfo.Labels): + logger.Debugf("Tarfs enable but Parent (%s) of snapshot %s is not a tarfs layer", id, s.ID) + default: + err := sn.fs.MergeTarfsLayers(s, func(id string) string { return sn.upperPath(id) }) + if err != nil { + return nil, "", errors.Wrap(err, "merge tarfs layers") + } + handler = remoteHandler(id, pInfo.Labels) } } } diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index 3139723ca5..c26dfee408 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -53,12 +53,11 @@ import ( var _ snapshots.Snapshotter = &snapshotter{} type snapshotter struct { - root string - nydusdPath string - // Storing snapshots' state, parentage and other metadata - ms *storage.MetaStore + root string + nydusdPath string + ms *storage.MetaStore // Storing snapshots' state, parentage and other metadata fs *filesystem.Filesystem - manager *mgr.Manager + cgroupManager *cgroup.Manager enableNydusOverlayFS bool syncRemove bool cleanupOnClose bool @@ -102,23 +101,62 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho } } - manager, err := mgr.NewManager(mgr.Opt{ - NydusdBinaryPath: cfg.DaemonConfig.NydusdPath, + blockdevManager, err := mgr.NewManager(mgr.Opt{ + NydusdBinaryPath: "", Database: db, CacheDir: cfg.CacheManagerConfig.CacheDir, RootDir: cfg.Root, RecoverPolicy: rp, - FsDriver: config.GetFsDriver(), + FsDriver: config.FsDriverBlockdev, DaemonConfig: daemonConfig, CgroupMgr: cgroupMgr, }) if err != nil { - return nil, errors.Wrap(err, "create daemons manager") + return nil, errors.Wrap(err, "create blockdevice manager") + } + + var fscacheManager *mgr.Manager + if config.GetFsDriver() == config.FsDriverFscache { + mgr, err := mgr.NewManager(mgr.Opt{ + NydusdBinaryPath: cfg.DaemonConfig.NydusdPath, + Database: db, + CacheDir: cfg.CacheManagerConfig.CacheDir, + RootDir: cfg.Root, + RecoverPolicy: rp, + FsDriver: config.FsDriverFscache, + DaemonConfig: daemonConfig, + CgroupMgr: cgroupMgr, + }) + if err != nil { + return nil, errors.Wrap(err, "create fscache manager") + } + fscacheManager = mgr + } + + var fusedevManager *mgr.Manager + if config.GetFsDriver() == config.FsDriverFusedev { + mgr, err := mgr.NewManager(mgr.Opt{ + NydusdBinaryPath: cfg.DaemonConfig.NydusdPath, + Database: db, + CacheDir: cfg.CacheManagerConfig.CacheDir, + RootDir: cfg.Root, + RecoverPolicy: rp, + FsDriver: config.FsDriverFusedev, + DaemonConfig: daemonConfig, + CgroupMgr: cgroupMgr, + }) + if err != nil { + return nil, errors.Wrap(err, "create fusedev manager") + } + fusedevManager = mgr } metricServer, err := metrics.NewServer( ctx, - metrics.WithProcessManager(manager), + metrics.WithRootDir(cfg.Root), + metrics.WithProcessManager(blockdevManager), + metrics.WithProcessManager(fscacheManager), + metrics.WithProcessManager(fusedevManager), ) if err != nil { return nil, errors.Wrap(err, "create metrics server") @@ -139,7 +177,9 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho } opts := []filesystem.NewFSOpt{ - filesystem.WithManager(manager), + filesystem.WithManager(blockdevManager), + filesystem.WithManager(fscacheManager), + filesystem.WithManager(fusedevManager), filesystem.WithNydusImageBinaryPath(cfg.DaemonConfig.NydusdPath), filesystem.WithVerifier(verifier), filesystem.WithRootMountpoint(config.GetRootMountpoint()), @@ -169,7 +209,9 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho if cfg.Experimental.EnableTarfs { // FIXME: get the insecure option from nydusd config. _, backendConfig := daemonConfig.StorageBackend() - tarfsMgr := tarfs.NewManager(backendConfig.SkipVerify, cfg.Experimental.TarfsHint, cfg.DaemonConfig.NydusImagePath, int64(cfg.Experimental.TarfsMaxConcurrentProc)) + tarfsMgr := tarfs.NewManager(backendConfig.SkipVerify, cfg.Experimental.TarfsHint, + cacheConfig.CacheDir, cfg.DaemonConfig.NydusImagePath, + int64(cfg.Experimental.TarfsMaxConcurrentProc)) opts = append(opts, filesystem.WithTarfsManager(tarfsMgr)) } @@ -179,7 +221,16 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho } if config.IsSystemControllerEnabled() { - managers := []*mgr.Manager{manager} + managers := []*mgr.Manager{} + if blockdevManager != nil { + managers = append(managers, blockdevManager) + } + if fscacheManager != nil { + managers = append(managers, fscacheManager) + } + if fusedevManager != nil { + managers = append(managers, fusedevManager) + } systemController, err := system.NewSystemController(nydusFs, managers, config.SystemControllerAddress()) if err != nil { return nil, errors.Wrap(err, "create system controller") @@ -232,7 +283,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho ms: ms, syncRemove: syncRemove, fs: nydusFs, - manager: manager, + cgroupManager: cgroupMgr, enableNydusOverlayFS: cfg.SnapshotsConfig.EnableNydusOverlayFS, cleanupOnClose: cfg.CleanupOnClose, }, nil @@ -283,15 +334,17 @@ func (o *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, e usage = snapshots.Usage(du) } - // Blob layers are all committed snapshots - if info.Kind == snapshots.KindCommitted && label.IsNydusDataLayer(info.Labels) { - blobDigest := info.Labels[snpkg.TargetLayerDigestLabel] - // Try to get nydus meta layer/snapshot disk usage - cacheUsage, err := o.fs.CacheUsage(ctx, blobDigest) - if err != nil { - return snapshots.Usage{}, errors.Wrapf(err, "try to get snapshot %s nydus disk usage", id) + // Caculate disk space usage under cacheDir of committed snapshots. + if info.Kind == snapshots.KindCommitted && + (label.IsNydusDataLayer(info.Labels) || label.IsTarfsDataLayer(info.Labels)) { + if blobDigest, ok := info.Labels[snpkg.TargetLayerDigestLabel]; ok { + // Try to get nydus meta layer/snapshot disk usage + cacheUsage, err := o.fs.CacheUsage(ctx, blobDigest) + if err != nil { + return snapshots.Usage{}, errors.Wrapf(err, "try to get snapshot %s nydus disk usage", id) + } + usage.Add(cacheUsage) } - usage.Add(cacheUsage) } return usage, nil @@ -330,6 +383,9 @@ func (o *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er needRemoteMounts = true metaSnapshotID = id } + } else if label.IsTarfsDataLayer(info.Labels) { + needRemoteMounts = true + metaSnapshotID = id } if info.Kind == snapshots.KindActive && info.Parent != "" { @@ -341,8 +397,7 @@ func (o *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er } needRemoteMounts = true metaSnapshotID = pID - } - if o.fs.TarfsEnabled() && o.fs.IsMergedTarfsLayer(pID) { + } else if o.fs.TarfsEnabled() && o.fs.IsMountedTarfsLayer(pID) { needRemoteMounts = true metaSnapshotID = pID } @@ -351,7 +406,7 @@ func (o *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er } } - if o.fs.ReferrerDetectEnabled() { + if o.fs.ReferrerDetectEnabled() && !needRemoteMounts { if id, _, err := o.findReferrerLayer(ctx, key); err == nil { needRemoteMounts = true metaSnapshotID = id @@ -422,7 +477,7 @@ func (o *snapshotter) View(ctx context.Context, key, parent string, opts ...snap // Nydusd might not be running. We should run nydusd to reflect the rootfs. if err = o.fs.WaitUntilReady(pID); err != nil { if errors.Is(err, errdefs.ErrNotFound) { - if err := o.fs.Mount(pID, pInfo.Labels, false); err != nil { + if err := o.fs.Mount(pID, pInfo.Labels, nil); err != nil { return nil, errors.Wrapf(err, "mount rafs, instance id %s", pID) } @@ -440,25 +495,23 @@ func (o *snapshotter) View(ctx context.Context, key, parent string, opts ...snap return nil, errors.New("only can view nydus topmost layer") } // Otherwise, it is OCI snapshots + base, s, err := o.createSnapshot(ctx, snapshots.KindView, key, parent, opts) if err != nil { return nil, err } - if o.fs.TarfsEnabled() { - if o.fs.IsTarfsLayer(pID) { - if !o.fs.IsMergedTarfsLayer(pID) { - if err := o.fs.MergeTarfsLayers(s, func(id string) string { return o.upperPath(id) }); err != nil { - return nil, errors.Wrapf(err, "tarfs merge fail %s", pID) - } - - if err := o.fs.Mount(pID, pInfo.Labels, true); err != nil { - return nil, errors.Wrapf(err, "mount tarfs, snapshot id %s", pID) - } + if o.fs.TarfsEnabled() && label.IsTarfsDataLayer(pInfo.Labels) { + if !o.fs.IsMountedTarfsLayer(pID) { + if err := o.fs.MergeTarfsLayers(s, func(id string) string { return o.upperPath(id) }); err != nil { + return nil, errors.Wrapf(err, "tarfs merge fail %s", pID) + } + if err := o.fs.Mount(pID, pInfo.Labels, &s); err != nil { + return nil, errors.Wrapf(err, "mount tarfs, snapshot id %s", pID) } - needRemoteMounts = true - metaSnapshotID = pID } + needRemoteMounts = true + metaSnapshotID = pID } log.L.Infof("[View] snapshot with key %s parent %s", key, parent) @@ -487,21 +540,18 @@ func (o *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap }() // grab the existing id - id, info, _, err := storage.GetInfo(ctx, key) + id, _, _, err := storage.GetInfo(ctx, key) if err != nil { return err } log.L.Infof("[Commit] snapshot with key %q snapshot id %s", key, id) - var usage fs.Usage - // For OCI compatibility, we calculate disk usage and commit the usage to DB. - // Nydus disk usage calculation will be delayed until containerd queries. - if !label.IsNydusMetaLayer(info.Labels) && !label.IsNydusDataLayer(info.Labels) { - usage, err = fs.DiskUsage(ctx, o.upperPath(id)) - if err != nil { - return err - } + // For OCI compatibility, we calculate disk usage of the snapshotDir and commit the usage to DB. + // Nydus disk usage under the cacheDir will be delayed until containerd queries. + usage, err := fs.DiskUsage(ctx, o.upperPath(id)) + if err != nil { + return err } if _, err = storage.CommitActive(ctx, key, name, snapshots.Usage(usage), opts...); err != nil { @@ -544,6 +594,8 @@ func (o *snapshotter) Remove(ctx context.Context, key string) error { if label.IsNydusMetaLayer(info.Labels) { log.L.Infof("[Remove] nydus meta snapshot with key %s snapshot id %s", key, id) + } else if label.IsTarfsDataLayer(info.Labels) { + log.L.Infof("[Remove] nydus tarfs snapshot with key %s snapshot id %s", key, id) } if info.Kind == snapshots.KindCommitted { @@ -608,8 +660,8 @@ func (o *snapshotter) Close() error { o.fs.TryStopSharedDaemon() - if o.manager.CgroupMgr != nil { - if err := o.manager.CgroupMgr.Delete(); err != nil { + if o.cgroupManager != nil { + if err := o.cgroupManager.Delete(); err != nil { log.L.Errorf("failed to destroy cgroup, err %v", err) } } @@ -744,8 +796,8 @@ func overlayMount(options []string) []mount.Mount { } } -func (o *snapshotter) prepareRemoteSnapshot(id string, labels map[string]string, isTarfs bool) error { - return o.fs.Mount(id, labels, isTarfs) +func (o *snapshotter) prepareRemoteSnapshot(id string, labels map[string]string, s storage.Snapshot) error { + return o.fs.Mount(id, labels, &s) } // `s` is the upmost snapshot and `id` refers to the nydus meta snapshot