Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/v1.5] Add an option for tasks to mutate the State object #2529

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions pkg/state/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ func (s *State) runTask(node *kubeoneapi.HostConfig, task NodeTask) error {
return fail.Runtime(task(s, node, conn), "")
}

type stateMutatorFn func(original *State, tmp *State)

// RunTaskOnNodes runs the given task on the given selection of hosts.
func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, parallel RunModeEnum) error {
func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, parallel RunModeEnum, stateMutator stateMutatorFn) error {
var (
errorsLock sync.Mutex
aggregateErrs []error
stateMutatorLock sync.Mutex
errorsLock sync.Mutex
aggregateErrs []error
)

wg := sync.WaitGroup{}
Expand All @@ -79,6 +82,12 @@ func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, par
aggregateErrs = append(aggregateErrs, fail.Runtime(err, "running task on %q", node.PublicAddress))
}

if stateMutator != nil {
stateMutatorLock.Lock()
stateMutator(s, ctx)
stateMutatorLock.Unlock()
}

wg.Done()
}(ctx, &nodes[i])
} else {
Expand All @@ -88,6 +97,10 @@ func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, par

break
}

if stateMutator != nil {
stateMutator(s, ctx)
}
}
}

Expand Down Expand Up @@ -116,6 +129,16 @@ func (s *State) RunTaskOnAllNodes(task NodeTask, parallel RunModeEnum) error {

// RunTaskOnLeader runs the given task on the leader host.
func (s *State) RunTaskOnLeader(task NodeTask) error {
return s.runTaskOnLeader(task, nil)
}

// RunTaskOnLeaderWithMutator runs the given task on the leader host with a state mutator function.
func (s *State) RunTaskOnLeaderWithMutator(task NodeTask, stateMutator stateMutatorFn) error {
return s.runTaskOnLeader(task, stateMutator)
}

// RunTaskOnLeader runs the given task on the leader host.
func (s *State) runTaskOnLeader(task NodeTask, stateMutator stateMutatorFn) error {
leader, err := s.Cluster.Leader()
if err != nil {
return err
Expand All @@ -125,18 +148,18 @@ func (s *State) RunTaskOnLeader(task NodeTask) error {
leader,
}

return s.RunTaskOnNodes(hosts, task, false)
return s.RunTaskOnNodes(hosts, task, false, stateMutator)
}

// RunTaskOnFollowers runs the given task on the follower hosts.
func (s *State) RunTaskOnFollowers(task NodeTask, parallel RunModeEnum) error {
return s.RunTaskOnNodes(s.Cluster.Followers(), task, parallel)
return s.RunTaskOnNodes(s.Cluster.Followers(), task, parallel, nil)
}

func (s *State) RunTaskOnControlPlane(task NodeTask, parallel RunModeEnum) error {
return s.RunTaskOnNodes(s.Cluster.ControlPlane.Hosts, task, parallel)
return s.RunTaskOnNodes(s.Cluster.ControlPlane.Hosts, task, parallel, nil)
}

func (s *State) RunTaskOnStaticWorkers(task NodeTask, parallel RunModeEnum) error {
return s.RunTaskOnNodes(s.Cluster.StaticWorkers.Hosts, task, parallel)
return s.RunTaskOnNodes(s.Cluster.StaticWorkers.Hosts, task, parallel, nil)
}
8 changes: 3 additions & 5 deletions pkg/tasks/kubeadm_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ import (
)

func determinePauseImage(s *state.State) error {
if rc := s.Cluster.RegistryConfiguration; rc == nil || rc.OverwriteRegistry == "" {
return nil
}

s.Logger.Infoln("Determining Kubernetes pause image...")

return s.RunTaskOnLeader(determinePauseImageExecutor)
return s.RunTaskOnLeaderWithMutator(determinePauseImageExecutor, func(original *state.State, tmp *state.State) {
original.PauseImage = tmp.PauseImage
})
}

func determinePauseImageExecutor(s *state.State, node *kubeoneapi.HostConfig, conn executor.Interface) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func WithResources(t Tasks) Tasks {
return s.Cluster.CABundle != ""
},
},
{
Fn: determinePauseImage,
Operation: "determining the pause image",
},
{
Fn: patchStaticPods,
Operation: "patching static pods",
Expand Down