Skip to content

Commit

Permalink
Merge pull request #5303 from k0sproject/backport-5172-to-release-1.29
Browse files Browse the repository at this point in the history
[Backport release-1.29] Applier manager improvements
  • Loading branch information
jnummelin authored Dec 10, 2024
2 parents ee8705d + 1eefb17 commit 4a29c7f
Showing 1 changed file with 70 additions and 61 deletions.
131 changes: 70 additions & 61 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package applier

import (
"context"
"errors"
"fmt"
"path"
"time"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/k0sproject/k0s/pkg/constant"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
Expand All @@ -38,20 +41,18 @@ type Manager struct {
K0sVars *config.CfgVars
KubeClientFactory kubeutil.ClientFactoryInterface

// client kubernetes.Interface
applier Applier
bundlePath string
cancelWatcher context.CancelFunc
log *logrus.Entry
stacks map[string]stack
bundleDir string
stop func(reason string)
log *logrus.Entry

LeaderElector leaderelector.Interface
}

var _ manager.Component = (*Manager)(nil)

type stack = struct {
context.CancelFunc
cancel context.CancelCauseFunc
stopped <-chan struct{}
*StackApplier
}

Expand All @@ -62,21 +63,21 @@ func (m *Manager) Init(ctx context.Context) error {
return fmt.Errorf("failed to create manifest bundle dir %s: %w", m.K0sVars.ManifestsDir, err)
}
m.log = logrus.WithField("component", constant.ApplierManagerComponentName)
m.stacks = make(map[string]stack)
m.bundlePath = m.K0sVars.ManifestsDir

m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory)
m.bundleDir = m.K0sVars.ManifestsDir

m.LeaderElector.AddAcquiredLeaseCallback(func() {
watcherCtx, cancel := context.WithCancel(ctx)
m.cancelWatcher = cancel
ctx, cancel := context.WithCancelCause(ctx)
stopped := make(chan struct{})

m.stop = func(reason string) { cancel(errors.New(reason)); <-stopped }
go func() {
_ = m.runWatchers(watcherCtx)
defer close(stopped)
wait.UntilWithContext(ctx, m.runWatchers, 1*time.Minute)
}()
})
m.LeaderElector.AddLostLeaseCallback(func() {
if m.cancelWatcher != nil {
m.cancelWatcher()
if m.stop != nil {
m.stop("lost leadership")
}
})

Expand All @@ -90,107 +91,115 @@ func (m *Manager) Start(_ context.Context) error {

// Stop stops the Manager
func (m *Manager) Stop() error {
if m.cancelWatcher != nil {
m.cancelWatcher()
if m.stop != nil {
m.stop("applier manager is stopping")
}
return nil
}

func (m *Manager) runWatchers(ctx context.Context) error {
log := logrus.WithField("component", constant.ApplierManagerComponentName)

func (m *Manager) runWatchers(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.WithError(err).Error("failed to create watcher")
return err
m.log.WithError(err).Error("Failed to create watcher")
return
}
defer watcher.Close()
defer func() {
if err := watcher.Close(); err != nil {
m.log.WithError(err).Error("Failed to close watcher")
}
}()

err = watcher.Add(m.bundlePath)
err = watcher.Add(m.bundleDir)
if err != nil {
log.Warnf("Failed to start watcher: %s", err.Error())
m.log.WithError(err).Error("Failed to watch bundle directory")
return
}

m.log.Info("Starting watch loop")

// Add all directories after the bundle dir has been added to the watcher.
// Doing it the other way round introduces a race condition when directories
// get created after the initial listing but before the watch starts.

dirs, err := dir.GetAll(m.bundlePath)
dirs, err := dir.GetAll(m.bundleDir)
if err != nil {
return err
m.log.WithError(err).Error("Failed to read bundle directory")
return
}

ctx, cancel := context.WithCancelCause(ctx)
stacks := make(map[string]stack, len(dirs))

for _, dir := range dirs {
m.createStack(ctx, path.Join(m.bundlePath, dir))
m.createStack(ctx, stacks, path.Join(m.bundleDir, dir))
}

for {
select {
case err, ok := <-watcher.Errors:
if !ok {
return err
}
case err := <-watcher.Errors:
m.log.WithError(err).Error("Watch error")
cancel(err)

log.Warnf("watch error: %s", err.Error())
case event, ok := <-watcher.Events:
if !ok {
return nil
}
case event := <-watcher.Events:
switch event.Op {
case fsnotify.Create:
if dir.IsDirectory(event.Name) {
m.createStack(ctx, event.Name)
m.createStack(ctx, stacks, event.Name)
}
case fsnotify.Remove:
m.removeStack(ctx, event.Name)
m.removeStack(ctx, stacks, event.Name)
}

case <-ctx.Done():
log.Info("manifest watcher done")
return nil
m.log.Infof("Watch loop done (%v)", context.Cause(ctx))
for _, stack := range stacks {
<-stack.stopped
}

return
}
}
}

func (m *Manager) createStack(ctx context.Context, name string) {
func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name string) {
// safeguard in case the fswatcher would trigger an event for an already existing stack
if _, ok := m.stacks[name]; ok {
if _, ok := stacks[name]; ok {
return
}

stackCtx, cancelStack := context.WithCancel(ctx)
stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)}
m.stacks[name] = stack
ctx, cancel := context.WithCancelCause(ctx)
stopped := make(chan struct{})

stack := stack{cancel, stopped, NewStackApplier(name, m.KubeClientFactory)}
stacks[name] = stack

go func() {
defer close(stopped)
log := m.log.WithField("stack", name)
for {

wait.UntilWithContext(ctx, func(ctx context.Context) {
log.Info("Running stack")
if err := stack.Run(stackCtx); err != nil {
if err := stack.Run(ctx); err != nil {
log.WithError(err).Error("Failed to run stack")
}
}, 1*time.Minute)

select {
case <-time.After(10 * time.Second):
continue
case <-stackCtx.Done():
log.Info("Stack done")
return
}
}
log.Infof("Stack done (%v)", context.Cause(ctx))
}()
}

func (m *Manager) removeStack(ctx context.Context, name string) {
stack, ok := m.stacks[name]
func (m *Manager) removeStack(ctx context.Context, stacks map[string]stack, name string) {
stack, ok := stacks[name]
if !ok {
m.log.
WithField("path", name).
Debug("attempted to remove non-existent stack, probably not a directory")
return
}

delete(m.stacks, name)
stack.CancelFunc()
delete(stacks, name)
stack.cancel(errors.New("stack removed"))
<-stack.stopped

log := m.log.WithField("stack", name)
if err := stack.DeleteStack(ctx); err != nil {
Expand Down

0 comments on commit 4a29c7f

Please sign in to comment.