diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index a58742ba9b..fcc31488b6 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -351,7 +351,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s err = errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID) } case config.FsDriverBlockdev: - err = fs.tarfsMgr.MountTarErofs(snapshotID, s, labels, rafs) + err = fs.tarfsMgr.MountErofs(snapshotID, s, labels, rafs) if err != nil { err = errors.Wrapf(err, "mount tarfs for snapshot %s", snapshotID) } diff --git a/pkg/label/label.go b/pkg/label/label.go index f5392771d4..ccad8f567e 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -36,6 +36,8 @@ const ( NydusRefLayer = "containerd.io/snapshot/nydus-ref" // The blobID of associated layer, also marking the layer as a nydus tarfs, set by the snapshotter NydusTarfsLayer = "containerd.io/snapshot/nydus-tarfs" + // List of parent snapshot IDs, saved in `Rafs.Annotation`. + NydusTarfsParents = "containerd.io/snapshot/nydus-tarfs-parent-snapshot-list" // Dm-verity information for image block device NydusImageBlockInfo = "containerd.io/snapshot/nydus-image-block" // Dm-verity information for layer block device diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 07615f20cf..cd2415047a 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -57,6 +57,7 @@ const ( ) type Manager struct { + RemountMap map[string]*rafs.Rafs // Scratch space to store rafs instances needing remount on startup snapshotMap map[string]*snapshotStatus // tarfs snapshots status, indexed by snapshot ID mutex sync.Mutex mutexLoopDev sync.Mutex @@ -87,6 +88,7 @@ type snapshotStatus struct { func NewManager(insecure, checkTarfsHint bool, cacheDirPath, nydusImagePath string, maxConcurrentProcess int64) *Manager { return &Manager{ snapshotMap: map[string]*snapshotStatus{}, + RemountMap: map[string]*rafs.Rafs{}, cacheDirPath: cacheDirPath, nydusImagePath: nydusImagePath, insecure: insecure, @@ -363,10 +365,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath) if err != nil && !errdefs.IsAlreadyExists(err) { return epilog(err, "generate tarfs data from image layer blob") - } else { - msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready", snapshotID) - return epilog(nil, msg) } + msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready", snapshotID) + return epilog(nil, msg) } } @@ -389,7 +390,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, } else { // Download and convert layer content in background. // Will retry when the content is actually needed if the background process failed. - go process(rc, remote) + go func() { + _ = process(rc, remote) + }() } return err @@ -418,7 +421,7 @@ func (t *Manager) retryPrepareLayer(snapshotID, upperDirPath string, labels map[ case TarfsStatusPrepare: log.L.Infof("Another thread is retrying snapshot %s, wait for the result", snapshotID) st.mutex.Unlock() - st, err = t.waitLayerReady(snapshotID, false) + _, err = t.waitLayerReady(snapshotID, false) return err case TarfsStatusReady: log.L.Infof("Another thread has retried snapshot %s and succeed", snapshotID) @@ -622,7 +625,7 @@ func (t *Manager) ExportBlockData(s storage.Snapshot, perLayer bool, labels map[ return updateFields, nil } -func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels map[string]string, rafs *rafs.Rafs) error { +func (t *Manager) MountErofs(snapshotID string, s *storage.Snapshot, labels map[string]string, rafs *rafs.Rafs) error { if s == nil { return errors.New("snapshot object for MountTarErofs() is nil") } @@ -643,6 +646,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } var devices []string + var parents []string // When merging bootstrap, we need to arrange layer bootstrap in order from low to high for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- { snapshotID := s.ParentIDs[idx] @@ -663,10 +667,89 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m st.dataLoopdev = loopdev } devices = append(devices, "device="+st.dataLoopdev.Name()) + parents = append(parents, snapshotID) + } + + st.mutex.Unlock() + } + parentList := strings.Join(parents, ",") + devices = append(devices, "ro") + mountOpts := strings.Join(devices, ",") + + st, err := t.getSnapshotStatus(snapshotID, true) + if err != nil { + return err + } + defer st.mutex.Unlock() + + mountPoint := path.Join(rafs.GetSnapshotDir(), "mnt") + if len(st.erofsMountPoint) > 0 { + if st.erofsMountPoint == mountPoint { + log.L.Debugf("tarfs for snapshot %s has already been mounted at %s", snapshotID, mountPoint) + return nil + } + return errors.Errorf("tarfs for snapshot %s has already been mounted at %s", snapshotID, st.erofsMountPoint) + } + + if st.metaLoopdev == nil { + loopdev, err := t.attachLoopdev(mergedBootstrap) + if err != nil { + return errors.Wrapf(err, "attach merged bootstrap %s to loopdev", mergedBootstrap) + } + st.metaLoopdev = loopdev + } + devName := st.metaLoopdev.Name() + + if err = os.MkdirAll(mountPoint, 0750); err != nil { + return errors.Wrapf(err, "create tarfs mount dir %s", mountPoint) + } + + err = unix.Mount(devName, mountPoint, "erofs", 0, mountOpts) + if err != nil { + return errors.Wrapf(err, "mount erofs at %s with opts %s", mountPoint, mountOpts) + } + st.erofsMountPoint = mountPoint + rafs.SetMountpoint(mountPoint) + rafs.AddAnnotation(label.NydusTarfsParents, parentList) + return nil +} + +func (t *Manager) RemountErofs(snapshotID string, rafs *rafs.Rafs) error { + upperDirPath := path.Join(rafs.GetSnapshotDir(), "fs") + + log.L.Infof("remount EROFS for tarfs snapshot %s at %s", snapshotID, upperDirPath) + var parents []string + if parentList, ok := rafs.Annotations[label.NydusTarfsParents]; ok { + parents = strings.Split(parentList, ",") + } else { + if !config.GetTarfsMountOnHost() { + rafs.SetMountpoint(upperDirPath) } + return nil + } + + var devices []string + for idx := 0; idx < len(parents); idx++ { + snapshotID := parents[idx] + st, err := t.waitLayerReady(snapshotID, true) + if err != nil { + return errors.Wrapf(err, "wait for tarfs conversion task") + } + + if st.dataLoopdev == nil { + blobTarFilePath := t.layerTarFilePath(st.blobID) + loopdev, err := t.attachLoopdev(blobTarFilePath) + if err != nil { + st.mutex.Unlock() + return errors.Wrapf(err, "attach layer tar file %s to loopdev", blobTarFilePath) + } + st.dataLoopdev = loopdev + } + devices = append(devices, "device="+st.dataLoopdev.Name()) st.mutex.Unlock() } + devices = append(devices, "ro") mountOpts := strings.Join(devices, ",") st, err := t.getSnapshotStatus(snapshotID, true) @@ -685,6 +768,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } if st.metaLoopdev == nil { + mergedBootstrap := t.imageMetaFilePath(upperDirPath) loopdev, err := t.attachLoopdev(mergedBootstrap) if err != nil { return errors.Wrapf(err, "attach merged bootstrap %s to loopdev", mergedBootstrap) @@ -767,6 +851,43 @@ func (t *Manager) DetachLayer(snapshotID string) error { return nil } +func (t *Manager) RecoverSnapshoInfo(ctx context.Context, id string, info snapshots.Info, upperPath string) error { + t.mutex.Lock() + defer t.mutex.Unlock() + log.L.Infof("recover tarfs snapshot %s with path %s", id, upperPath) + + if _, ok := t.snapshotMap[id]; ok { + // RecoverSnapshotInfo() is called after RecoverRafsInstance(), so there may be some snapshots already exist. + return nil + } + + layerMetaFilePath := t.layerMetaFilePath(upperPath) + if _, err := os.Stat(layerMetaFilePath); err == nil { + layerDigest := digest.Digest(info.Labels[label.CRILayerDigest]) + if layerDigest.Validate() != nil { + return errors.Errorf("not found layer digest label") + } + ctx, cancel := context.WithCancel(context.Background()) + t.snapshotMap[id] = &snapshotStatus{ + status: TarfsStatusReady, + blobID: layerDigest.Hex(), + cancel: cancel, + ctx: ctx, + } + } else { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + t.snapshotMap[id] = &snapshotStatus{ + status: TarfsStatusFailed, + wg: wg, + cancel: cancel, + ctx: ctx, + } + } + return nil +} + // This method is called in single threaded mode during startup, so we do not lock `snapshotStatus` objects. func (t *Manager) RecoverRafsInstance(r *rafs.Rafs) error { t.mutex.Lock() diff --git a/snapshot/process.go b/snapshot/process.go index ebab3c3946..6a97b2cbfd 100644 --- a/snapshot/process.go +++ b/snapshot/process.go @@ -157,7 +157,7 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry, // which have already been prepared by overlay snapshotter logger.Infof("Prepare active snapshot %s in Nydus tarfs mode", key) - err = sn.mergeTarfs(ctx, s, parent, pID, pInfo) + err = sn.mergeTarfs(ctx, s, parent, pInfo) if err != nil { return nil, "", errors.Wrapf(err, "merge tarfs layers for snapshot %s", pID) } diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index 5e1bb5c2c4..c10200cc28 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -34,6 +34,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/metrics" "github.com/containerd/nydus-snapshotter/pkg/metrics/collector" "github.com/containerd/nydus-snapshotter/pkg/pprof" + "github.com/containerd/nydus-snapshotter/pkg/rafs" "github.com/containerd/nydus-snapshotter/pkg/referrer" "github.com/containerd/nydus-snapshotter/pkg/system" "github.com/containerd/nydus-snapshotter/pkg/tarfs" @@ -225,8 +226,9 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho opts = append(opts, filesystem.WithReferrerManager(referrerMgr)) } + var tarfsMgr *tarfs.Manager if cfg.Experimental.TarfsConfig.EnableTarfs { - tarfsMgr := tarfs.NewManager(skipSSLVerify, cfg.Experimental.TarfsConfig.TarfsHint, + tarfsMgr = tarfs.NewManager(skipSSLVerify, cfg.Experimental.TarfsConfig.TarfsHint, cacheConfig.CacheDir, cfg.DaemonConfig.NydusImagePath, int64(cfg.Experimental.TarfsConfig.MaxConcurrentProc)) opts = append(opts, filesystem.WithTarfsManager(tarfsMgr)) @@ -284,7 +286,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho syncRemove = true } - return &snapshotter{ + snapshotter := &snapshotter{ root: cfg.Root, nydusdPath: cfg.DaemonConfig.NydusdPath, ms: ms, @@ -294,7 +296,41 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho enableNydusOverlayFS: cfg.SnapshotsConfig.EnableNydusOverlayFS, enableKataVolume: cfg.SnapshotsConfig.EnableKataVolume, cleanupOnClose: cfg.CleanupOnClose, - }, nil + } + + // There's special requirement to recover tarfs instance because it depdens on `snapshotter.ms` + // so it can't be done in `Filesystem.Recover()` + if tarfsMgr != nil { + snapshotter.recoverTarfs(ctx, tarfsMgr) + } + + return snapshotter, nil +} + +func (o *snapshotter) recoverTarfs(ctx context.Context, tarfsMgr *tarfs.Manager) { + // First recover all snapshot information related to tarfs, mount operation depends on snapshots. + _ = o.Walk(ctx, func(ctx context.Context, i snapshots.Info) error { + if _, ok := i.Labels[label.NydusTarfsLayer]; ok { + id, _, _, err := snapshot.GetSnapshotInfo(ctx, o.ms, i.Name) + if err != nil { + return errors.Wrapf(err, "get id for snapshot %s", i.Name) + } + log.L.Infof("found tarfs snapshot %s with name %s", id, i.Name) + upperPath := o.upperPath(id) + if err = tarfsMgr.RecoverSnapshoInfo(ctx, id, i, upperPath); err != nil { + return errors.Wrapf(err, "get id for snapshot %s", i.Name) + } + } + return nil + }) + + for id, r := range tarfsMgr.RemountMap { + log.L.Infof("remount tarfs snapshot %s", id) + if err := tarfsMgr.RemountErofs(id, r); err != nil { + log.L.Warnf("failed to remount EROFS filesystem for tarfs, %s", err) + } + } + tarfsMgr.RemountMap = map[string]*rafs.Rafs{} } func (o *snapshotter) Cleanup(ctx context.Context) error { @@ -519,7 +555,7 @@ func (o *snapshotter) View(ctx context.Context, key, parent string, opts ...snap if o.fs.TarfsEnabled() && label.IsTarfsDataLayer(pInfo.Labels) { log.L.Infof("Prepare view snapshot %s in Nydus tarfs mode", pID) - err = o.mergeTarfs(ctx, s, parent, pID, pInfo) + err = o.mergeTarfs(ctx, s, parent, pInfo) if err != nil { return nil, errors.Wrapf(err, "merge tarfs layers for snapshot %s", pID) } @@ -796,7 +832,7 @@ func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, k return &base, s, nil } -func (o *snapshotter) mergeTarfs(ctx context.Context, s storage.Snapshot, parent, pID string, pInfo snapshots.Info) error { +func (o *snapshotter) mergeTarfs(ctx context.Context, s storage.Snapshot, parent string, pInfo snapshots.Info) error { infoGetter := func(ctx context.Context, id string) (string, snapshots.Info, error) { for { id2, info, _, err := snapshot.GetSnapshotInfo(ctx, o.ms, parent)