diff --git a/pkg/state/task.go b/pkg/state/task.go index b273903dd..1adf050bd 100644 --- a/pkg/state/task.go +++ b/pkg/state/task.go @@ -54,11 +54,14 @@ func (s *State) runTask(node *kubeoneapi.HostConfig, task NodeTask) error { return fail.Runtime(task(s, node, conn), "") } +type stateMutatorFn func(original *State, tmp *State) + // RunTaskOnNodes runs the given task on the given selection of hosts. -func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, parallel RunModeEnum) error { +func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, parallel RunModeEnum, stateMutator stateMutatorFn) error { var ( - errorsLock sync.Mutex - aggregateErrs []error + stateMutatorLock sync.Mutex + errorsLock sync.Mutex + aggregateErrs []error ) wg := sync.WaitGroup{} @@ -79,6 +82,12 @@ func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, par aggregateErrs = append(aggregateErrs, fail.Runtime(err, "running task on %q", node.PublicAddress)) } + if stateMutator != nil { + stateMutatorLock.Lock() + stateMutator(s, ctx) + stateMutatorLock.Unlock() + } + wg.Done() }(ctx, &nodes[i]) } else { @@ -88,6 +97,10 @@ func (s *State) RunTaskOnNodes(nodes []kubeoneapi.HostConfig, task NodeTask, par break } + + if stateMutator != nil { + stateMutator(s, ctx) + } } } @@ -116,6 +129,16 @@ func (s *State) RunTaskOnAllNodes(task NodeTask, parallel RunModeEnum) error { // RunTaskOnLeader runs the given task on the leader host. func (s *State) RunTaskOnLeader(task NodeTask) error { + return s.runTaskOnLeader(task, nil) +} + +// RunTaskOnLeaderWithMutator runs the given task on the leader host with a state mutator function. +func (s *State) RunTaskOnLeaderWithMutator(task NodeTask, stateMutator stateMutatorFn) error { + return s.runTaskOnLeader(task, stateMutator) +} + +// RunTaskOnLeader runs the given task on the leader host. +func (s *State) runTaskOnLeader(task NodeTask, stateMutator stateMutatorFn) error { leader, err := s.Cluster.Leader() if err != nil { return err @@ -125,18 +148,18 @@ func (s *State) RunTaskOnLeader(task NodeTask) error { leader, } - return s.RunTaskOnNodes(hosts, task, false) + return s.RunTaskOnNodes(hosts, task, false, stateMutator) } // RunTaskOnFollowers runs the given task on the follower hosts. func (s *State) RunTaskOnFollowers(task NodeTask, parallel RunModeEnum) error { - return s.RunTaskOnNodes(s.Cluster.Followers(), task, parallel) + return s.RunTaskOnNodes(s.Cluster.Followers(), task, parallel, nil) } func (s *State) RunTaskOnControlPlane(task NodeTask, parallel RunModeEnum) error { - return s.RunTaskOnNodes(s.Cluster.ControlPlane.Hosts, task, parallel) + return s.RunTaskOnNodes(s.Cluster.ControlPlane.Hosts, task, parallel, nil) } func (s *State) RunTaskOnStaticWorkers(task NodeTask, parallel RunModeEnum) error { - return s.RunTaskOnNodes(s.Cluster.StaticWorkers.Hosts, task, parallel) + return s.RunTaskOnNodes(s.Cluster.StaticWorkers.Hosts, task, parallel, nil) } diff --git a/pkg/tasks/kubeadm_config.go b/pkg/tasks/kubeadm_config.go index f82db93bc..c7f7e4c4e 100644 --- a/pkg/tasks/kubeadm_config.go +++ b/pkg/tasks/kubeadm_config.go @@ -28,13 +28,11 @@ import ( ) func determinePauseImage(s *state.State) error { - if rc := s.Cluster.RegistryConfiguration; rc == nil || rc.OverwriteRegistry == "" { - return nil - } - s.Logger.Infoln("Determining Kubernetes pause image...") - return s.RunTaskOnLeader(determinePauseImageExecutor) + return s.RunTaskOnLeaderWithMutator(determinePauseImageExecutor, func(original *state.State, tmp *state.State) { + original.PauseImage = tmp.PauseImage + }) } func determinePauseImageExecutor(s *state.State, node *kubeoneapi.HostConfig, conn executor.Interface) error { diff --git a/pkg/tasks/tasks.go b/pkg/tasks/tasks.go index 873fb97ff..068a6b882 100644 --- a/pkg/tasks/tasks.go +++ b/pkg/tasks/tasks.go @@ -212,6 +212,10 @@ func WithResources(t Tasks) Tasks { return s.Cluster.CABundle != "" }, }, + { + Fn: determinePauseImage, + Operation: "determining the pause image", + }, { Fn: patchStaticPods, Operation: "patching static pods",