Skip to content

Commit

Permalink
Wait for goroutines to exit in applier manager
Browse files Browse the repository at this point in the history
Rename cancelWatcher to stop and wait until the newly added stopped
channel is closed. Also, add a stopped channel to each stack to do the
same for each stack-specific goroutine.

Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Oct 1, 2024
1 parent db5e0d2 commit 402c728
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ type Manager struct {
K0sVars *config.CfgVars
KubeClientFactory kubeutil.ClientFactoryInterface

applier Applier
bundlePath string
cancelWatcher context.CancelFunc
log *logrus.Entry
applier Applier
bundlePath string
stop func()
log *logrus.Entry

LeaderElector leaderelector.Interface
}

var _ manager.Component = (*Manager)(nil)

type stack = struct {
context.CancelFunc
cancel context.CancelFunc
stopped <-chan struct{}
*StackApplier
}

Expand All @@ -65,15 +66,18 @@ func (m *Manager) Init(ctx context.Context) error {
m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory)

m.LeaderElector.AddAcquiredLeaseCallback(func() {
watcherCtx, cancel := context.WithCancel(ctx)
m.cancelWatcher = cancel
ctx, cancel := context.WithCancel(ctx)
stopped := make(chan struct{})

m.stop = func() { cancel(); <-stopped }
go func() {
_ = m.runWatchers(watcherCtx)
defer close(stopped)
_ = m.runWatchers(ctx)
}()
})
m.LeaderElector.AddLostLeaseCallback(func() {
if m.cancelWatcher != nil {
m.cancelWatcher()
if m.stop != nil {
m.stop()
}
})

Expand All @@ -87,8 +91,8 @@ func (m *Manager) Start(_ context.Context) error {

// Stop stops the Manager
func (m *Manager) Stop() error {
if m.cancelWatcher != nil {
m.cancelWatcher()
if m.stop != nil {
m.stop()
}
return nil
}
Expand Down Expand Up @@ -140,6 +144,10 @@ func (m *Manager) runWatchers(ctx context.Context) error {

case <-ctx.Done():
log.Info("manifest watcher done")
for _, stack := range stacks {
<-stack.stopped
}

return nil
}
}
Expand All @@ -151,22 +159,25 @@ func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name
return
}

stackCtx, cancelStack := context.WithCancel(ctx)
stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)}
ctx, cancel := context.WithCancel(ctx)
stopped := make(chan struct{})

stack := stack{cancel, stopped, NewStackApplier(name, m.KubeClientFactory)}
stacks[name] = stack

go func() {
defer close(stopped)
log := m.log.WithField("stack", name)
for {
log.Info("Running stack")
if err := stack.Run(stackCtx); err != nil {
if err := stack.Run(ctx); err != nil {
log.WithError(err).Error("Failed to run stack")
}

select {
case <-time.After(10 * time.Second):
continue
case <-stackCtx.Done():
case <-ctx.Done():
log.Info("Stack done")
return
}
Expand All @@ -184,7 +195,8 @@ func (m *Manager) removeStack(ctx context.Context, stacks map[string]stack, name
}

delete(stacks, name)
stack.CancelFunc()
stack.cancel()
<-stack.stopped

log := m.log.WithField("stack", name)
if err := stack.DeleteStack(ctx); err != nil {
Expand Down

0 comments on commit 402c728

Please sign in to comment.