Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Remove the podControl and serviceControl interfaces #36

Merged
merged 4 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions job_controller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func (jc *JobController) ReconcileJobs(

oldStatus := jobStatus.DeepCopy()

pods, err := jc.GetPodsForJob(metaObject)
pods, err := jc.Controller.GetPodsForJob(job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to specify Controller here? Interface embedding will do it for us, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's due to the controller field is explicitly used in the struct.

this was there originally, we can remove it in a separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. SGTM


if err != nil {
log.Warnf("GetPodsForJob error %v", err)
return err
}

services, err := jc.GetServicesForJob(metaObject)
services, err := jc.Controller.GetServicesForJob(job)

if err != nil {
log.Warnf("GetServicesForJob error %v", err)
Expand Down
22 changes: 9 additions & 13 deletions job_controller/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type ControllerInterface interface {
// Returns the Job from API server
GetJobFromAPIClient(namespace, name string) (metav1.Object, error)

// GetPodsForJob returns the pods managed by the job. This can be achieved by selecting pods using label key "job-name"
// i.e. all pods created by the job will come with label "job-name" = <this_job_name>
GetPodsForJob(job interface{}) ([]*v1.Pod, error)

// GetServicesForJob returns the services managed by the job. This can be achieved by selecting services using label key "job-name"
// i.e. all services created by the job will come with label "job-name" = <this_job_name>
GetServicesForJob(job interface{}) ([]*v1.Service, error)

// DeleteJob deletes the job
DeleteJob(job interface{}) error

Expand All @@ -71,7 +79,7 @@ type ControllerInterface interface {
DeleteService(job interface{}, name string, namespace string) error

// CreatePod creates the pod
CreatePod(job interface{}, podTemplate *v1.PodTemplateSpec) error
CreatePod(job interface{}, pod *v1.Pod) error

// DeletePod deletes the pod
DeletePod(job interface{}, pod *v1.Pod) error
Expand Down Expand Up @@ -179,16 +187,6 @@ func NewJobController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()})

realPodControl := RealPodControl{
KubeClient: kubeClientSet,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()}),
}

realServiceControl := RealServiceControl{
KubeClient: kubeClientSet,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()}),
}

jobControllerConfig := JobControllerConfiguration{
ReconcilerSyncLoopPeriod: reconcilerSyncPeriod,
EnableGangScheduling: enableGangScheduling,
Expand All @@ -197,8 +195,6 @@ func NewJobController(
jc := JobController{
Controller: controllerImpl,
Config: jobControllerConfig,
PodControl: realPodControl,
ServiceControl: realServiceControl,
KubeClientSet: kubeClientSet,
KubeBatchClientSet: kubeBatchClientSet,
Expectations: controller.NewControllerExpectations(),
Expand Down
75 changes: 39 additions & 36 deletions job_controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"strconv"
"strings"

"github.com/golang/glog"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -191,41 +193,6 @@ func (jc *JobController) DeletePod(obj interface{}) {
jc.WorkQueue.Add(jobKey)
}

// getPodsForJob returns the set of pods that this job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned Pods are pointers into the cache.
func (jc *JobController) GetPodsForJob(job metav1.Object) ([]*v1.Pod, error) {
// Create selector.
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: jc.GenLabels(job.GetName()),
})

if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all pods to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
pods, err := jc.PodLister.Pods(job.GetNamespace()).List(labels.Everything())
if err != nil {
return nil, err
}

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).

canAdoptFunc := RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := jc.Controller.GetJobFromAPIClient(job.GetNamespace(), job.GetName())
if err != nil {
return nil, err
}
if fresh.GetUID() != job.GetUID() {
return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", job.GetNamespace(), job.GetName(), fresh.GetUID(), job.GetUID())
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(jc.PodControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc)
return cm.ClaimPods(pods)
}

// FilterPodsForReplicaType returns pods belong to a replicaType.
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error) {
Expand Down Expand Up @@ -420,8 +387,9 @@ func (jc *JobController) createNewPod(job interface{}, rt, index string, spec *c
podTemplate.Spec.SchedulerName = gangSchedulerName
}
}
controllerRef := jc.GenOwnerReference(metaObject)

err = jc.Controller.CreatePod(job, podTemplate)
err = jc.createPodWithControllerRef(metaObject.GetNamespace(), podTemplate, runtimeObject, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
Expand All @@ -437,6 +405,41 @@ func (jc *JobController) createNewPod(job interface{}, rt, index string, spec *c
return nil
}

func (jc *JobController) createPodWithControllerRef(namespace string, template *v1.PodTemplateSpec,
controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return jc.createPod("", namespace, template, controllerObject, controllerRef)
}

func (jc *JobController) createPod(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
pod, err := GetPodFromTemplate(template, object, controllerRef)
pod.Namespace = namespace
if err != nil {
return err
}
if len(nodeName) != 0 {
pod.Spec.NodeName = nodeName
}
if labels.Set(pod.Labels).AsSelectorPreValidated().Empty() {
return fmt.Errorf("unable to create pods, no labels")
}
if err := jc.Controller.CreatePod(object, pod); err != nil {
jc.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
return err
} else {
accessor, err := meta.Accessor(object)
if err != nil {
glog.Errorf("parentObject does not have ObjectMeta, %v", err)
return nil
}
glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), pod.Name)
jc.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", pod.Name)
}
return nil
}

func setRestartPolicy(podTemplateSpec *v1.PodTemplateSpec, spec *common.ReplicaSpec) {
// This is necessary since restartPolicyExitCode is not supported in v1.PodTemplateSpec
if spec.RestartPolicy == common.RestartPolicyExitCode {
Expand Down
76 changes: 40 additions & 36 deletions job_controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/controller"
)
Expand Down Expand Up @@ -81,41 +83,6 @@ func (jc *JobController) DeleteService(obj interface{}) {
// TODO(CPH): handle this gracefully.
}

// getServicesForJob returns the set of services that this job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned services are pointers into the cache.
func (jc *JobController) GetServicesForJob(job metav1.Object) ([]*v1.Service, error) {
// Create selector
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: jc.GenLabels(job.GetName()),
})

if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all services to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
services, err := jc.ServiceLister.Services(job.GetNamespace()).List(labels.Everything())
if err != nil {
return nil, err
}

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing services (see #42639).
canAdoptFunc := RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := jc.Controller.GetJobFromInformerCache(job.GetNamespace(), job.GetName())
if err != nil {
return nil, err
}
if fresh.GetUID() != job.GetUID() {
return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", job.GetNamespace(), job.GetName(), fresh.GetUID(), job.GetUID())
}
return fresh, nil
})
cm := NewServiceControllerRefManager(jc.ServiceControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc)
return cm.ClaimServices(services)
}

// FilterServicesForReplicaType returns service belong to a replicaType.
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error) {
var result []*v1.Service
Expand Down Expand Up @@ -255,8 +222,10 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype commonv1.Repl

service.Name = GenGeneralName(job.GetName(), rt, index)
service.Labels = labels
// Create OwnerReference.
controllerRef := jc.GenOwnerReference(job)

err = jc.Controller.CreateService(job, service)
err = jc.CreateServicesWithControllerRef(job.GetNamespace(), service, job.(runtime.Object), controllerRef)
if err != nil && errors.IsTimeout(err) {
// Service is created but its initialization has timed out.
// If the initialization is successful eventually, the
Expand All @@ -271,3 +240,38 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype commonv1.Repl
}
return nil
}

func (jc *JobController) CreateServicesWithControllerRef(namespace string, service *v1.Service, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return jc.createServices(namespace, service, controllerObject, controllerRef)
}

func (jc *JobController) createServices(namespace string, service *v1.Service, object runtime.Object, controllerRef *metav1.OwnerReference) error {
if labels.Set(service.Labels).AsSelectorPreValidated().Empty() {
return fmt.Errorf("unable to create Services, no labels")
}
serviceWithOwner, err := getServiceFromTemplate(service, object, controllerRef)
serviceWithOwner.Namespace = namespace
if err != nil {
jc.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreateServiceReason, "Error creating: %v", err)
return fmt.Errorf("unable to create services: %v", err)
}

err = jc.Controller.CreateService(object, serviceWithOwner)
if err != nil {
jc.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreateServiceReason, "Error creating: %v", err)
return fmt.Errorf("unable to create services: %v", err)
}

accessor, err := meta.Accessor(object)
if err != nil {
log.Errorf("parentObject does not have ObjectMeta, %v", err)
return nil
}
log.Infof("Controller %v created service %v", accessor.GetName(), serviceWithOwner.Name)
jc.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreateServiceReason, "Created service: %v", serviceWithOwner.Name)

return nil
}
12 changes: 11 additions & 1 deletion job_controller/test_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,22 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)

var _ ControllerInterface = &TestJobController{}

type TestJobController struct {
job *testv1.TestJob
pods []*corev1.Pod
services []*corev1.Service
}

func (t TestJobController) GetPodsForJob(job interface{}) ([]*corev1.Pod, error) {
return []*corev1.Pod{}, nil
}

func (t TestJobController) GetServicesForJob(job interface{}) ([]*corev1.Service, error) {
return []*corev1.Service{}, nil
}

func (TestJobController) ControllerName() string {
return "test-operator"
}
Expand Down Expand Up @@ -79,7 +89,7 @@ func (t *TestJobController) DeleteService(job interface{}, name string, namespac
return nil
}

func (t *TestJobController) CreatePod(job interface{}, podTemplate *corev1.PodTemplateSpec) error {
func (t *TestJobController) CreatePod(job interface{}, pod *corev1.Pod) error {
return nil
}

Expand Down