Skip to content

Commit

Permalink
tarfs: refine tarfs overall design and implementation details
Browse files Browse the repository at this point in the history
1) move loopdev setup from Prepare()/Merge() into Mount(), preparing
   for support of nodev mode in addition to blockdev.
2) store tar file in the global cache directory, mount tarfs at "mnt"
   instead of "tarfs", to keep consistence with fusedev/fscache drivers.
3) use tempfile/rename mechanism to easy error recover
4) use lock to avoid some possible race conditions
5) use mode 0750 instead of 0755 for new directories
6) associate a Manager with filesystems managed by blockdev

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
  • Loading branch information
jiangliu committed Jul 19, 2023
1 parent 7b4c1d0 commit f77d93b
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 268 deletions.
20 changes: 10 additions & 10 deletions pkg/filesystem/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ func WithNydusImageBinaryPath(p string) NewFSOpt {

func WithManager(pm *manager.Manager) NewFSOpt {
return func(fs *Filesystem) error {
if pm == nil {
return errors.New("process manager cannot be nil")
if pm != nil {
switch pm.FsDriver {
case config.FsDriverBlockdev:
fs.blockdevManager = pm
case config.FsDriverFscache:
fs.fscacheManager = pm
case config.FsDriverFusedev:
fs.fusedevManager = pm
}
fs.enabledManagers = append(fs.enabledManagers, pm)
}

if pm.FsDriver == config.FsDriverFusedev {
fs.fusedevManager = pm
} else if pm.FsDriver == config.FsDriverFscache {
fs.fscacheManager = pm
}

fs.enabledManagers = append(fs.enabledManagers, pm)

return nil
}
}
Expand Down
49 changes: 29 additions & 20 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/snapshots/storage"

snpkg "github.com/containerd/containerd/pkg/snapshotters"
"github.com/containerd/nydus-snapshotter/config"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/daemon"
"github.com/containerd/nydus-snapshotter/pkg/daemon/types"
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/label"
"github.com/containerd/nydus-snapshotter/pkg/manager"
"github.com/containerd/nydus-snapshotter/pkg/referrer"
"github.com/containerd/nydus-snapshotter/pkg/signature"
Expand Down Expand Up @@ -212,21 +214,19 @@ func (fs *Filesystem) WaitUntilReady(snapshotID string) error {
// Mount will be called when containerd snapshotter prepare remote snapshotter
// this method will fork nydus daemon and manage it in the internal store, and indexed by snapshotID
// It must set up all necessary resources during Mount procedure and revoke any step if necessary.
func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, isTarfs bool) (err error) {
func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, s *storage.Snapshot) (err error) {
// Do not create RAFS instance in case of nodev.
if !fs.DaemonBacked() {
return nil
}

fsDriver := config.GetFsDriver()
if isTarfs {
if label.IsTarfsDataLayer(labels) {
fsDriver = config.FsDriverBlockdev
} else if !fs.DaemonBacked() {
fsDriver = config.FsDriverNodev
}
isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared
useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev

// Do not create RAFS instance in case of nodev.
if fsDriver == config.FsDriverNodev {
return nil
}

var imageID string
imageID, ok := labels[snpkg.TargetRefLabel]
if !ok {
Expand Down Expand Up @@ -258,7 +258,7 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, isTarfs
}()

fsManager, err := fs.getManager(fsDriver)
if err != nil && fsDriver != config.FsDriverBlockdev {
if err != nil {
return errors.Wrapf(err, "get filesystem manager for snapshot %s", snapshotID)
}

Expand Down Expand Up @@ -347,19 +347,17 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string, isTarfs
return errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID)
}
case config.FsDriverBlockdev:
mountPoint := path.Join(rafs.GetSnapshotDir(), "tarfs")
err = fs.tarfsMgr.MountTarErofs(snapshotID, mountPoint)
err = fs.tarfsMgr.MountTarErofs(snapshotID, s, rafs)
if err != nil {
return errors.Wrapf(err, "mount tarfs for snapshot %s", snapshotID)
}
rafs.SetMountpoint(mountPoint)
default:
return errors.Errorf("unknown filesystem driver %s for snapshot %s", fsDriver, snapshotID)
}

// Persist it after associate instance after all the states are calculated.
if fsDriver == config.FsDriverFscache || fsDriver == config.FsDriverFusedev {
if err := fsManager.NewInstance(rafs); err != nil {
return errors.Wrapf(err, "create instance %s", snapshotID)
}
if err := fsManager.NewInstance(rafs); err != nil {
return errors.Wrapf(err, "create instance %s", snapshotID)
}

return nil
Expand All @@ -373,12 +371,18 @@ func (fs *Filesystem) Umount(ctx context.Context, snapshotID string) error {
}

fsDriver := instance.GetFsDriver()
if fsDriver == config.FsDriverNodev {
return nil
}
fsManager, err := fs.getManager(fsDriver)
if err != nil && fsDriver != config.FsDriverBlockdev {
if err != nil {
return errors.Wrapf(err, "get manager for filesystem instance %s", instance.DaemonID)
}

if fsDriver == config.FsDriverFscache || fsDriver == config.FsDriverFusedev {
switch fsDriver {
case config.FsDriverFscache:
fallthrough
case config.FsDriverFusedev:
daemon, err := fs.getDaemonByRafs(instance)
if err != nil {
log.L.Debugf("snapshot %s has no associated nydusd", snapshotID)
Expand All @@ -398,10 +402,15 @@ func (fs *Filesystem) Umount(ctx context.Context, snapshotID string) error {
return errors.Wrapf(err, "destroy daemon %s", daemon.ID())
}
}
} else if fsDriver == config.FsDriverBlockdev {
case config.FsDriverBlockdev:
if err := fs.tarfsMgr.UmountTarErofs(snapshotID); err != nil {
return errors.Wrapf(err, "umount tar erofs on snapshot %s", snapshotID)
}
if err := fsManager.RemoveInstance(snapshotID); err != nil {
return errors.Wrapf(err, "remove snapshot %s", snapshotID)
}
default:
return errors.Errorf("unknown filesystem driver %s for snapshot %s", fsDriver, snapshotID)
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/filesystem/tarfs_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (fs *Filesystem) TarfsEnabled() bool {
return fs.tarfsMgr != nil
}

func (fs *Filesystem) PrepareTarfsLayer(ctx context.Context, labels map[string]string, snapshotID, storagePath string) error {
func (fs *Filesystem) PrepareTarfsLayer(ctx context.Context, labels map[string]string, snapshotID, upperDirPath string) error {
ref, ok := labels[snpkg.TargetRefLabel]
if !ok {
return errors.Errorf("not found image reference label")
Expand Down Expand Up @@ -50,8 +50,8 @@ func (fs *Filesystem) PrepareTarfsLayer(ctx context.Context, labels map[string]s
}

go func() {
if err := fs.tarfsMgr.PrepareLayer(snapshotID, ref, manifestDigest, layerDigest, storagePath); err != nil {
log.L.WithError(err).Errorf("async prepare Tarfs layer of snapshot ID %s", snapshotID)
if err := fs.tarfsMgr.PrepareLayer(snapshotID, ref, manifestDigest, layerDigest, upperDirPath); err != nil {
log.L.WithError(err).Errorf("async prepare tarfs layer of snapshot ID %s", snapshotID)
}
if limiter != nil {
limiter.Release(1)
Expand All @@ -70,9 +70,9 @@ func (fs *Filesystem) DetachTarfsLayer(snapshotID string) error {
}

func (fs *Filesystem) IsTarfsLayer(snapshotID string) bool {
return fs.tarfsMgr.CheckTarfsLayer(snapshotID, false)
return fs.tarfsMgr.IsTarfsLayer(snapshotID)
}

func (fs *Filesystem) IsMergedTarfsLayer(snapshotID string) bool {
return fs.tarfsMgr.CheckTarfsLayer(snapshotID, true)
func (fs *Filesystem) IsMountedTarfsLayer(snapshotID string) bool {
return fs.tarfsMgr.IsMountedTarfsLayer(snapshotID)
}
15 changes: 14 additions & 1 deletion pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
NydusMetaLayer = "containerd.io/snapshot/nydus-bootstrap"
// The referenced blob sha256 in format of `sha256:xxx`, set by image builders.
NydusRefLayer = "containerd.io/snapshot/nydus-ref"
// A bool flag to mark the layer as a nydus tarfs, set by the snapshotter
NydusTarfsLayer = "containerd.io/snapshot/nydus-tarfs"
// Annotation containing secret to pull images from registry, set by the snapshotter.
NydusImagePullSecret = "containerd.io/snapshot/pullsecret"
// Annotation containing username to pull images from registry, set by the snapshotter.
Expand All @@ -55,6 +57,9 @@ const (
)

func IsNydusDataLayer(labels map[string]string) bool {
if labels == nil {
return false
}
_, ok := labels[NydusDataLayer]
return ok
}
Expand All @@ -67,7 +72,15 @@ func IsNydusMetaLayer(labels map[string]string) bool {
return ok
}

func IsTarfsHint(labels map[string]string) bool {
func IsTarfsDataLayer(labels map[string]string) bool {
if labels == nil {
return false
}
_, ok := labels[NydusTarfsLayer]
return ok
}

func HasTarfsHint(labels map[string]string) bool {
if labels == nil {
return false
}
Expand Down
23 changes: 10 additions & 13 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ type Manager struct {
// supposed to refilled when nydus-snapshotter restarting.
daemonStates *DaemonStates

monitor LivenessMonitor
// TODO: Close me
LivenessNotifier chan deathEvent
monitor LivenessMonitor
LivenessNotifier chan deathEvent // TODO: Close me
RecoverPolicy config.DaemonRecoverPolicy
SupervisorSet *supervisor.SupervisorsSet

Expand All @@ -151,12 +150,10 @@ type Opt struct {
Database *store.Database
CacheDir string
RecoverPolicy config.DaemonRecoverPolicy
// Nydus-snapshotter work directory
RootDir string
DaemonConfig daemonconfig.DaemonConfig
CgroupMgr *cgroup.Manager
// In order to validate daemon fs driver is consistent with the latest snapshotter boot
FsDriver string
RootDir string // Nydus-snapshotter work directory
DaemonConfig daemonconfig.DaemonConfig
CgroupMgr *cgroup.Manager
FsDriver string // In order to validate daemon fs driver is consistent with the latest snapshotter boot
}

func (m *Manager) doDaemonFailover(d *daemon.Daemon) {
Expand Down Expand Up @@ -328,6 +325,10 @@ func (m *Manager) NewInstance(r *daemon.Rafs) error {
return m.store.AddInstance(r)
}

func (m *Manager) RemoveInstance(snapshotID string) error {
return m.store.DeleteInstance(snapshotID)
}

func (m *Manager) Lock() {
m.mu.Lock()
}
Expand All @@ -353,10 +354,6 @@ func (m *Manager) UnsubscribeDaemonEvent(d *daemon.Daemon) error {
return nil
}

func (m *Manager) RemoveInstance(snapshotID string) error {
return m.store.DeleteInstance(snapshotID)
}

func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
4 changes: 3 additions & 1 deletion pkg/metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ type Server struct {

func WithProcessManager(pm *manager.Manager) ServerOpt {
return func(s *Server) error {
s.managers = append(s.managers, pm)
if pm != nil {
s.managers = append(s.managers, pm)
}
return nil
}
}
Expand Down
Loading

0 comments on commit f77d93b

Please sign in to comment.