Skip to content

Commit

Permalink
Merge pull request #528 from jiangliu/rafs
Browse files Browse the repository at this point in the history
Make RAFS independent of daemon
  • Loading branch information
changweige authored Sep 8, 2023
2 parents 9a2575b + d646992 commit a2d744b
Show file tree
Hide file tree
Showing 19 changed files with 559 additions and 522 deletions.
129 changes: 58 additions & 71 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/containerd/nydus-snapshotter/config/daemonconfig"
"github.com/containerd/nydus-snapshotter/pkg/daemon/types"
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/rafs"
"github.com/containerd/nydus-snapshotter/pkg/supervisor"
"github.com/containerd/nydus-snapshotter/pkg/utils/erofs"
"github.com/containerd/nydus-snapshotter/pkg/utils/mount"
Expand All @@ -37,35 +38,35 @@ const (

type NewDaemonOpt func(d *Daemon) error

type States struct {
// Fields in this structure should be write-once, and caller should hold `Daemon.mu` when updating fields.
type ConfigState struct {
// A unique ID generated by daemon manager to identify the nydusd instance.
ID string
ProcessID int
APISocket string
DaemonMode config.DaemonMode
FsDriver string
LogDir string
LogLevel string
LogRotationSize int
LogToStdout bool
DaemonMode config.DaemonMode
FsDriver string
// Fusedev mountpoint on host kernel, the fscache fs driver doesn't need a host kernel mountpoint.
Mountpoint string
ThreadNum int
Mountpoint string
SupervisorPath string
ThreadNum int
// Where the configuration file resides, all rafs instances share the same configuration template
ConfigDir string
SupervisorPath string
ConfigDir string
}

// TODO: Record queried nydusd state
type Daemon struct {
States States
States ConfigState

mu sync.Mutex
// Host all RAFS filesystems managed by this daemon:
// fusedev dedicated mode: one and only one RAFS instance
// fusedev shared mode: zero, one or more RAFS instances
// fscache shared mode: zero, one or more RAFS instances
Instances rafsSet
RafsCache rafs.Cache

// Protect nydusd http client
cmu sync.Mutex
Expand Down Expand Up @@ -140,14 +141,14 @@ func (d *Daemon) LogFile() string {
return filepath.Join(d.States.LogDir, "nydusd.log")
}

func (d *Daemon) AddInstance(r *Rafs) {
d.Instances.Add(r)
func (d *Daemon) AddRafsInstance(r *rafs.Rafs) {
d.RafsCache.Add(r)
d.IncRef()
r.DaemonID = d.ID()
}

func (d *Daemon) RemoveInstance(snapshotID string) {
d.Instances.Remove(snapshotID)
func (d *Daemon) RemoveRafsInstance(snapshotID string) {
d.RafsCache.Remove(snapshotID)
d.DecRef()
}

Expand Down Expand Up @@ -222,7 +223,7 @@ func (d *Daemon) IsSharedDaemon() bool {
return d.HostMountpoint() == config.GetRootMountpoint()
}

func (d *Daemon) SharedMount(rafs *Rafs) error {
func (d *Daemon) SharedMount(rafs *rafs.Rafs) error {
defer d.SendStates()

switch d.States.FsDriver {
Expand All @@ -238,7 +239,7 @@ func (d *Daemon) SharedMount(rafs *Rafs) error {
}
}

func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
func (d *Daemon) sharedFusedevMount(rafs *rafs.Rafs) error {
client, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
Expand Down Expand Up @@ -268,20 +269,20 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
return nil
}

func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
func (d *Daemon) sharedErofsMount(ra *rafs.Rafs) error {
client, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "bind blob %s", d.ID())
}

// TODO: Why fs cache needing this work dir?
if err := os.MkdirAll(rafs.FscacheWorkDir(), 0755); err != nil {
return errors.Wrapf(err, "failed to create fscache work dir %s", rafs.FscacheWorkDir())
if err := os.MkdirAll(ra.FscacheWorkDir(), 0755); err != nil {
return errors.Wrapf(err, "failed to create fscache work dir %s", ra.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(ra.SnapshotID))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(rafs.SnapshotID), err)
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(ra.SnapshotID), err)
return err
}

Expand All @@ -294,20 +295,20 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
return errors.Wrapf(err, "request to bind fscache blob")
}

mountPoint := rafs.GetMountpoint()
mountPoint := ra.GetMountpoint()
if err := os.MkdirAll(mountPoint, 0755); err != nil {
return errors.Wrapf(err, "create mountpoint %s", mountPoint)
}

bootstrapPath, err := rafs.BootstrapFile()
bootstrapPath, err := ra.BootstrapFile()
if err != nil {
return err
}
fscacheID := erofs.FscacheID(rafs.SnapshotID)
fscacheID := erofs.FscacheID(ra.SnapshotID)

cfg := c.(*daemonconfig.FscacheDaemonConfig)
rafs.AddAnnotation(AnnoFsCacheDomainID, cfg.DomainID)
rafs.AddAnnotation(AnnoFsCacheID, fscacheID)
ra.AddAnnotation(rafs.AnnoFsCacheDomainID, cfg.DomainID)
ra.AddAnnotation(rafs.AnnoFsCacheID, fscacheID)

if err := erofs.Mount(bootstrapPath, cfg.DomainID, fscacheID, mountPoint); err != nil {
if !errdefs.IsErofsMounted(err) {
Expand All @@ -323,7 +324,7 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
return nil
}

func (d *Daemon) SharedUmount(rafs *Rafs) error {
func (d *Daemon) SharedUmount(rafs *rafs.Rafs) error {
defer d.SendStates()

switch d.States.FsDriver {
Expand All @@ -343,19 +344,19 @@ func (d *Daemon) SharedUmount(rafs *Rafs) error {
}
}

func (d *Daemon) sharedErofsUmount(rafs *Rafs) error {
func (d *Daemon) sharedErofsUmount(ra *rafs.Rafs) error {
c, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "unbind blob %s", d.ID())
}
domainID := rafs.Annotations[AnnoFsCacheDomainID]
fscacheID := rafs.Annotations[AnnoFsCacheID]
domainID := ra.Annotations[rafs.AnnoFsCacheDomainID]
fscacheID := ra.Annotations[rafs.AnnoFsCacheID]

if err := c.UnbindBlob(domainID, fscacheID); err != nil {
return errors.Wrapf(err, "request to unbind fscache blob, domain %s, fscache %s", domainID, fscacheID)
}

mountpoint := rafs.GetMountpoint()
mountpoint := ra.GetMountpoint()
if err := erofs.Umount(mountpoint); err != nil {
return errors.Wrapf(err, "umount erofs %s mountpoint, %s", err, mountpoint)
}
Expand All @@ -369,7 +370,7 @@ func (d *Daemon) sharedErofsUmount(rafs *Rafs) error {
return nil
}

func (d *Daemon) UmountInstance(r *Rafs) error {
func (d *Daemon) UmountRafsInstance(r *rafs.Rafs) error {
if d.IsSharedDaemon() {
if err := d.SharedUmount(r); err != nil {
return errors.Wrapf(err, "umount fs instance %s", r.SnapshotID)
Expand All @@ -379,12 +380,12 @@ func (d *Daemon) UmountInstance(r *Rafs) error {
return nil
}

func (d *Daemon) UmountAllInstances() error {
func (d *Daemon) UmountRafsInstances() error {
if d.IsSharedDaemon() {
d.Instances.Lock()
defer d.Instances.Unlock()
d.RafsCache.Lock()
defer d.RafsCache.Unlock()

instances := d.Instances.ListLocked()
instances := d.RafsCache.ListLocked()

for _, r := range instances {
if err := d.SharedUmount(r); err != nil {
Expand Down Expand Up @@ -505,35 +506,27 @@ func (d *Daemon) GetClient() (NydusdClient, error) {
d.cmu.Lock()
defer d.cmu.Unlock()

if err := d.ensureClientUnlocked(); err != nil {
return nil, err
}

return d.client, nil
}

func (d *Daemon) ResetClient() {
d.cmu.Lock()
d.client = nil
d.cmu.Unlock()
}

// The client should be locked outside
func (d *Daemon) ensureClientUnlocked() error {
if d.client == nil {
sock := d.GetAPISock()
// The socket file may be residual from a dead nydusd
err := WaitUntilSocketExisted(sock, d.Pid())
if err != nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon socket %s", sock)
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon socket %s", sock)
}
client, err := NewNydusClient(sock)
if err != nil {
return errors.Wrapf(err, "create daemon %s client", d.ID())
return nil, errors.Wrapf(err, "create daemon %s client", d.ID())
}
d.client = client
}
return nil

return d.client, nil
}

func (d *Daemon) ResetClient() {
d.cmu.Lock()
d.client = nil
d.cmu.Unlock()
}

func (d *Daemon) Terminate() error {
Expand Down Expand Up @@ -571,7 +564,7 @@ func (d *Daemon) Wait() error {
// nydus-snapshotter, p.Wait() will return err, so here should exclude this case
if _, err = p.Wait(); err != nil && !errors.Is(err, syscall.ECHILD) {
log.L.Errorf("failed to process wait, %v", err)
} else if d.HostMountpoint() != "" || config.GetFsDriver() != config.FsDriverFscache {
} else if d.HostMountpoint() != "" && config.GetFsDriver() == config.FsDriverFusedev {
// No need to umount if the nydusd never performs mount. In other word, it does not
// associate with a host mountpoint.
if err := mount.WaitUntilUnmounted(d.HostMountpoint()); err != nil {
Expand All @@ -587,7 +580,7 @@ func (d *Daemon) Wait() error {
func (d *Daemon) ClearVestige() {
mounter := mount.Mounter{}
if d.States.FsDriver == config.FsDriverFscache {
instances := d.Instances.List()
instances := d.RafsCache.List()
for _, i := range instances {
if err := mounter.Umount(i.GetMountpoint()); err != nil {
log.L.Warnf("Can't umount %s, %v", d.States.Mountpoint, err)
Expand All @@ -612,25 +605,19 @@ func (d *Daemon) ClearVestige() {
d.ResetClient()
}

func (d *Daemon) CloneInstances(src *Daemon) {
src.Instances.Lock()
defer src.Instances.Unlock()

instances := src.Instances.ListLocked()

d.Lock()
defer d.Unlock()
d.Instances.instances = instances
func (d *Daemon) CloneRafsInstances(src *Daemon) {
instances := src.RafsCache.List()
d.RafsCache.SetIntances(instances)
}

// Daemon must be started and reach RUNNING state before call this method
func (d *Daemon) RecoveredMountInstances() error {
func (d *Daemon) RecoverRafsInstances() error {
if d.IsSharedDaemon() {
d.Instances.Lock()
defer d.Instances.Unlock()
d.RafsCache.Lock()
defer d.RafsCache.Unlock()

instances := make([]*Rafs, 0, 16)
for _, r := range d.Instances.ListLocked() {
instances := make([]*rafs.Rafs, 0, 16)
for _, r := range d.RafsCache.ListLocked() {
instances = append(instances, r)
}

Expand All @@ -656,7 +643,7 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {
d := &Daemon{}
d.States.ID = newID()
d.States.DaemonMode = config.DaemonModeDedicated
d.Instances = rafsSet{instances: make(map[string]*Rafs)}
d.RafsCache = rafs.NewRafsCache()

for _, o := range opt {
err := o(d)
Expand Down
Loading

0 comments on commit a2d744b

Please sign in to comment.