Skip to content

Commit

Permalink
Merge pull request #249 from changweige/upgrade
Browse files Browse the repository at this point in the history
Add nydusd live-upgrade control flow
  • Loading branch information
changweige authored Nov 25, 2022
2 parents 44bf88f + 2edfc80 commit 84d9ba3
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 40 deletions.
7 changes: 7 additions & 0 deletions pkg/daemon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
endpointSendFd = "/api/v1/daemon/fuse/sendfd"
// Command nydusd to begin file system service.
endpointStart = "/api/v1/daemon/start"
endpointExit = "/api/v1/daemon/exit"

// --- V2 API begins
endpointBlobs = "/api/v2/blobs"
Expand All @@ -64,6 +65,7 @@ type NydusdClient interface {
TakeOver() error
SendFd() error
Start() error
Exit() error
}

// Nydusd API server http client used to command nydusd's action and
Expand Down Expand Up @@ -284,3 +286,8 @@ func (c *nydusdClient) Start() error {
url := c.url(endpointStart, query{})
return c.request(http.MethodPut, url, nil, nil)
}

func (c *nydusdClient) Exit() error {
url := c.url(endpointExit, query{})
return c.request(http.MethodPut, url, nil, nil)
}
13 changes: 13 additions & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,19 @@ func (d *Daemon) Start() error {
return nil
}

func (d *Daemon) Exit() error {
c, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "start service")
}

if err := c.Exit(); err != nil {
return errors.Wrap(err, "request to exit service")
}

return nil
}

func (d *Daemon) GetFsMetrics(sid string) (*types.FsMetrics, error) {
c, err := d.GetClient()
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/daemon/rafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ func (d *Daemon) UmountAllInstances() error {
return nil
}

func (d *Daemon) CloneInstances(src *Daemon) error {
src.Instances.Lock()
defer src.Instances.Unlock()

instances := src.Instances.ListUnlocked()

d.Lock()
defer d.Unlock()
d.Instances.instances = instances

return nil
}

func (d *Daemon) UmountInstance(r *Rafs) error {
if r.Mountpoint != d.States.Mountpoint {
if err := d.SharedUmount(r); err != nil {
Expand Down
32 changes: 24 additions & 8 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/daemon"
"github.com/containerd/nydus-snapshotter/pkg/daemon/command"
"github.com/containerd/nydus-snapshotter/pkg/daemon/types"
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/pkg/errors"
)

// Fork the nydusd daemon with the process PID decided
func (m *Manager) StartDaemon(d *daemon.Daemon) error {
cmd, err := m.buildDaemonCommand(d)
cmd, err := m.BuildDaemonCommand(d, "", false)
if err != nil {
return errors.Wrapf(err, "create command for daemon %s", d.ID())
}
Expand Down Expand Up @@ -87,7 +88,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {

// Build a daemon command which will be started to fork a new nydusd process later
// according to previously setup daemon object.
func (m *Manager) buildDaemonCommand(d *daemon.Daemon) (*exec.Cmd, error) {
func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) {
var cmdOpts []command.Opt

nydusdThreadNum := d.NydusdThreadNum()
Expand All @@ -105,7 +106,9 @@ func (m *Manager) buildDaemonCommand(d *daemon.Daemon) (*exec.Cmd, error) {
cmdOpts = append(cmdOpts, command.WithMode("fuse"))

if d.Supervisor != nil {
cmdOpts = append(cmdOpts, command.WithSupervisor(d.Supervisor.Sock()), command.WithID(d.ID()))
cmdOpts = append(cmdOpts,
command.WithSupervisor(d.Supervisor.Sock()),
command.WithID(d.ID()))
}

if nydusdThreadNum != 0 {
Expand All @@ -115,6 +118,9 @@ func (m *Manager) buildDaemonCommand(d *daemon.Daemon) (*exec.Cmd, error) {
switch {
case !m.IsSharedDaemon():
rafs := d.Instances.Head()
if rafs == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
Expand All @@ -127,15 +133,18 @@ func (m *Manager) buildDaemonCommand(d *daemon.Daemon) (*exec.Cmd, error) {

case m.IsSharedDaemon():
cmdOpts = append(cmdOpts, command.WithMountpoint(d.HostMountpoint()))

default:
return nil, errors.Errorf("invalid daemon mode %s ", m.daemonMode)
}
}

cmdOpts = append(cmdOpts,
command.WithAPISock(d.GetAPISock()),
command.WithLogLevel(d.States.LogLevel))
command.WithLogLevel(d.States.LogLevel),
command.WithAPISock(d.GetAPISock()))

if upgrade {
cmdOpts = append(cmdOpts, command.WithUpgrade())
}

if !d.States.LogToStdout {
cmdOpts = append(cmdOpts, command.WithLogFile(d.LogFile()))
Expand All @@ -146,9 +155,16 @@ func (m *Manager) buildDaemonCommand(d *daemon.Daemon) (*exec.Cmd, error) {
return nil, err
}

log.L.Infof("Start nydusd daemon: %s %s", m.nydusdBinaryPath, strings.Join(args, " "))
var nydusdPath string
if bin == "" {
nydusdPath = m.NydusdBinaryPath
} else {
nydusdPath = bin
}

log.L.Infof("nydusd command: %s %s", nydusdPath, strings.Join(args, " "))

cmd := exec.Command(m.nydusdBinaryPath, args...)
cmd := exec.Command(nydusdPath, args...)

// nydusd standard output and standard error rather than its logs are
// always redirected to snapshotter's respectively
Expand Down
33 changes: 27 additions & 6 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/store"
"github.com/containerd/nydus-snapshotter/pkg/supervisor"
"github.com/containerd/nydus-snapshotter/pkg/utils/mount"
)

type DaemonRecoverPolicy int
Expand Down Expand Up @@ -195,7 +194,7 @@ func (s *DaemonStates) Size() int {
// to avoid frequently operating DB
type Manager struct {
store Store
nydusdBinaryPath string
NydusdBinaryPath string
daemonMode config.DaemonMode
// Where nydusd stores cache files for fscache driver
cacheDir string
Expand All @@ -206,8 +205,6 @@ type Manager struct {
// supposed to refilled when nydus-snapshotter restarting.
daemonStates *DaemonStates

mounter mount.Interface

monitor LivenessMonitor
// TODO: Close me
LivenessNotifier chan deathEvent
Expand Down Expand Up @@ -343,8 +340,7 @@ func NewManager(opt Opt) (*Manager, error) {

mgr := &Manager{
store: s,
mounter: &mount.Mounter{},
nydusdBinaryPath: opt.NydusdBinaryPath,
NydusdBinaryPath: opt.NydusdBinaryPath,
daemonMode: opt.DaemonMode,
cacheDir: opt.CacheDir,
daemonStates: newDaemonStates(),
Expand Down Expand Up @@ -393,6 +389,31 @@ func (m *Manager) NewInstance(r *daemon.Rafs) error {
return m.store.AddInstance(r)
}

func (m *Manager) Lock() {
m.mu.Lock()
}

func (m *Manager) Unlock() {
m.mu.Unlock()
}

func (m *Manager) SubscribeDaemonEvent(d *daemon.Daemon) error {
if err := m.monitor.Subscribe(d.ID(), d.GetAPISock(), m.LivenessNotifier); err != nil {
log.L.Errorf("Nydusd %s probably not started", d.ID())
return errors.Wrapf(err, "subscribe daemon %s", d.ID())
}
return nil
}

func (m *Manager) UnsubscribeDaemonEvent(d *daemon.Daemon) error {
// Starting a new nydusd will re-subscribe
if err := m.monitor.Unsubscribe(d.ID()); err != nil {
log.L.Warnf("fail to unsubscribe daemon %s, %v", d.ID(), err)
return errors.Wrapf(err, "unsubscribe daemon %s", d.ID())
}
return nil
}

func (m *Manager) RemoveInstance(snapshotID string) error {
return m.store.DeleteInstance(snapshotID)
}
Expand Down
Loading

0 comments on commit 84d9ba3

Please sign in to comment.