Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add initial JobController reconciler test #27

Merged
merged 13 commits into from
Apr 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/furiko-io/cronexpr v0.1.1
github.com/google/go-cmp v0.5.6
github.com/mitchellh/go-testing-interface v1.0.0
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/mitchellh/mapstructure v1.4.3
github.com/nleeper/goment v1.4.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceT
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
Expand Down
9 changes: 9 additions & 0 deletions pkg/execution/controllers/jobcontroller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package jobcontroller
import (
"context"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -50,6 +51,10 @@ func (c *ExecutionControl) UpdateJob(ctx context.Context, rj, newRj *execution.J
return false, nil
}

if klog.V(5).Enabled() {
klog.V(5).Infof("jobcontroller: updating job, diff = %v", cmp.Diff(rj, newRj))
}

updatedRj, err := c.client.Jobs(rj.GetNamespace()).Update(ctx, newRj, metav1.UpdateOptions{})
if err != nil {
return false, err
Expand All @@ -72,6 +77,10 @@ func (c *ExecutionControl) UpdateJobStatus(ctx context.Context, rj, newRj *execu
return false, nil
}

if klog.V(5).Enabled() {
klog.V(5).Infof("jobcontroller: updating job status, diff = %v", cmp.Diff(rj, newRj))
}

updatedRj, err := c.client.Jobs(rj.GetNamespace()).UpdateStatus(ctx, newRj, metav1.UpdateOptions{})
if err != nil {
return false, err
Expand Down
28 changes: 19 additions & 9 deletions pkg/execution/controllers/jobcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

configv1 "github.com/furiko-io/furiko/apis/config/v1"
"github.com/furiko-io/furiko/pkg/execution/taskexecutor"
"github.com/furiko-io/furiko/pkg/execution/tasks"
"github.com/furiko-io/furiko/pkg/generated/clientset/versioned/scheme"
executioninformers "github.com/furiko-io/furiko/pkg/generated/informers/externalversions/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/runtime/controllercontext"
Expand Down Expand Up @@ -62,21 +63,30 @@ type Context struct {
controllercontext.ContextInterface
podInformer coreinformers.PodInformer
jobInformer executioninformers.JobInformer
hasSynced []cache.InformerSynced
HasSynced []cache.InformerSynced
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
tasks *taskexecutor.Manager
tasks tasks.ExecutorFactory
}

// NewContext returns a new Context.
func NewContext(context controllercontext.ContextInterface) *Context {
c := &Context{ContextInterface: context}

// Create recorder.
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: c.Clientsets().Kubernetes().CoreV1().Events(""),
Interface: context.Clientsets().Kubernetes().CoreV1().Events(""),
})
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})

return NewContextWithRecorder(context, recorder)
}

// NewContextWithRecorder returns a new Context with a custom EventRecorder.
func NewContextWithRecorder(context controllercontext.ContextInterface, recorder record.EventRecorder) *Context {
c := &Context{ContextInterface: context}

// Set recorder.
c.recorder = recorder

// Create workqueue.
ratelimiter := workqueue.DefaultControllerRateLimiter()
Expand All @@ -85,12 +95,12 @@ func NewContext(context controllercontext.ContextInterface) *Context {
// Bind informers.
c.podInformer = c.Informers().Kubernetes().Core().V1().Pods()
c.jobInformer = c.Informers().Furiko().Execution().V1alpha1().Jobs()
c.hasSynced = []cache.InformerSynced{
c.HasSynced = []cache.InformerSynced{
c.podInformer.Informer().HasSynced,
c.jobInformer.Informer().HasSynced,
}

// Add task manager
// Set task manager.
c.tasks = taskexecutor.NewManager(context.Clientsets(), context.Informers())

return c
Expand Down Expand Up @@ -120,7 +130,7 @@ func (c *Controller) Run(ctx context.Context) error {
ctx,
controllerName,
waitForCacheSyncTimeout,
c.hasSynced...,
c.HasSynced...,
); err != nil {
klog.ErrorS(err, "jobcontroller: cache sync timeout")
return err
Expand Down
221 changes: 219 additions & 2 deletions pkg/execution/controllers/jobcontroller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,48 @@
package jobcontroller_test

import (
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

executiongroup "github.com/furiko-io/furiko/apis/execution"
execution "github.com/furiko-io/furiko/apis/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/execution/controllers/jobcontroller"
"github.com/furiko-io/furiko/pkg/execution/taskexecutor/podtaskexecutor"
"github.com/furiko-io/furiko/pkg/execution/tasks"
"github.com/furiko-io/furiko/pkg/utils/execution/job"
"github.com/furiko-io/furiko/pkg/utils/k8sutils"
"github.com/furiko-io/furiko/pkg/utils/testutils"
)

const (
jobNamespace = "test"
jobName = "my-sample-job"

createTime = "2021-02-09T04:06:00Z"
startTime = "2021-02-09T04:06:01Z"
killTime = "2021-02-09T04:06:10Z"
finishTime = "2021-02-09T04:06:18Z"
now = "2021-02-09T04:06:05Z"
later15m = "2021-02-09T04:21:00Z"
later60m = "2021-02-09T05:06:00Z"
finishAfterTTL = "2021-02-09T05:06:18Z"
)

var (
// Job that is to be created. Specify startTime initially, assume that
// JobQueueController has already admitted the Job.
fakeJob = &execution.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: jobNamespace,
Name: jobName,
Namespace: jobNamespace,
CreationTimestamp: testutils.Mkmtime(createTime),
Finalizers: []string{
executiongroup.DeleteDependentsFinalizer,
},
},
Spec: execution.JobSpec{
Type: execution.JobTypeAdhoc,
Expand All @@ -55,5 +81,196 @@ var (
},
},
},
Status: execution.JobStatus{
StartTime: testutils.Mkmtimep(startTime),
},
}

// Job that was created with a fully populated status.
fakeJobResult = generateJobStatusFromPod(fakeJob, fakePodResult)

// Job with pod pending.
fakeJobPending = generateJobStatusFromPod(fakeJob, fakePodPending)

// Job with kill timestamp.
fakeJobWithKillTimestamp = func() *execution.Job {
newJob := fakeJobPending.DeepCopy()
newJob.Spec.KillTimestamp = testutils.Mkmtimep(killTime)
return newJob
}()

// Job with deletion timestamp.
fakeJobWithDeletionTimestamp = func() *execution.Job {
newJob := fakeJobPending.DeepCopy()
newJob.DeletionTimestamp = testutils.Mkmtimep(killTime)
return newJob
}()

// Job with deletion timestamp whose pods are killed.
fakeJobWithDeletionTimestampAndKilledPods = func() *execution.Job {
newJob := fakeJobWithDeletionTimestamp.DeepCopy()
newJob.Status.Phase = execution.JobKilled
newJob.Status.Condition = execution.JobCondition{
Finished: &execution.JobConditionFinished{
CreatedAt: testutils.Mkmtimep(createTime),
FinishedAt: testutils.Mkmtime(killTime),
Result: execution.JobResultKilled,
},
}
newJob.Status.Tasks[0].DeletedStatus = &execution.TaskStatus{
State: execution.TaskKilled,
Result: job.GetResultPtr(execution.JobResultKilled),
Reason: "JobDeleted",
Message: "Task was killed in response to deletion of Job",
}
newJob.Status.Tasks[0].FinishTimestamp = testutils.Mkmtimep(killTime)
return newJob
}()

// Job with deletion timestamp whose pods are killed.
fakeJobWithDeletionTimestampAndDeletedPods = func() *execution.Job {
newJob := fakeJobWithDeletionTimestamp.DeepCopy()
newJob.Finalizers = k8sutils.RemoveFinalizer(newJob.Finalizers, executiongroup.DeleteDependentsFinalizer)
newJob.Status.Phase = execution.JobKilled
newJob.Status.Condition = execution.JobCondition{
Finished: &execution.JobConditionFinished{
CreatedAt: testutils.Mkmtimep(createTime),
FinishedAt: testutils.Mkmtime(killTime),
Result: execution.JobResultKilled,
Reason: "JobDeleted",
Message: "Task was killed in response to deletion of Job",
},
}
newJob.Status.Tasks[0].Status = execution.TaskStatus{
State: execution.TaskKilled,
Result: job.GetResultPtr(execution.JobResultKilled),
Reason: "JobDeleted",
Message: "Task was killed in response to deletion of Job",
}
newJob.Status.Tasks[0].DeletedStatus = newJob.Status.Tasks[0].Status.DeepCopy()
newJob.Status.Tasks[0].FinishTimestamp = testutils.Mkmtimep(killTime)
return newJob
}()

// Job without pending timeout.
fakeJobWithoutPendingTimeout = func() *execution.Job {
newJob := fakeJobPending.DeepCopy()
newJob.Spec.Template.Task.PendingTimeoutSeconds = pointer.Int64(0)
return newJob
}()

// Job with pod being deleted.
fakeJobPodDeleting = func() *execution.Job {
newJob := generateJobStatusFromPod(fakeJobWithKillTimestamp, fakePodTerminating)
newJob.Status.Tasks[0].DeletedStatus = &execution.TaskStatus{
State: execution.TaskKilled,
Result: job.GetResultPtr(execution.JobResultKilled),
Reason: "Deleted",
Message: "Task was killed via deletion",
}
return newJob
}()

// Job with pod already deleted.
fakeJobPodDeleted = func() *execution.Job {
newJob := fakeJobPodDeleting.DeepCopy()
newJob.Status.Phase = execution.JobKilled
newJob.Status.Condition = execution.JobCondition{
Finished: &execution.JobConditionFinished{
CreatedAt: testutils.Mkmtimep(createTime),
FinishedAt: testutils.Mkmtime(now),
Result: execution.JobResultKilled,
Reason: "Deleted",
Message: "Task was killed via deletion",
},
}
newJob.Status.Tasks[0].Status = *newJob.Status.Tasks[0].DeletedStatus.DeepCopy()
newJob.Status.Tasks[0].FinishTimestamp = testutils.Mkmtimep(now)
return newJob
}()

// Job that has succeeded.
fakeJobFinished = generateJobStatusFromPod(fakeJobResult, fakePodFinished)

// Pod that is to be created.
fakePod, _ = podtaskexecutor.NewPod(fakeJob, 1)

// Pod that adds CreationTimestamp to mimic mutation on apiserver.
fakePodResult = func() *corev1.Pod {
newPod := fakePod.DeepCopy()
newPod.CreationTimestamp = testutils.Mkmtime(createTime)
return newPod
}()

// Pod that is in Pending state.
fakePodPending = func() *corev1.Pod {
newPod := fakePodResult.DeepCopy()
newPod.Status = corev1.PodStatus{
Phase: corev1.PodPending,
StartTime: testutils.Mkmtimep(startTime),
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "container",
State: corev1.ContainerState{
Waiting: &corev1.ContainerStateWaiting{
Reason: "ImagePullBackOff",
Message: "cannot pull image",
},
},
},
},
}
return newPod
}()

// Pod that is in Pending state and is in the process of being killed.
fakePodTerminating = killPod(fakePodPending, testutils.Mktime(killTime))

// Pod that is in Pending state and is in the process of being killed by pending
// timeout.
fakePodPendingTimeoutTerminating = func() *corev1.Pod {
newPod := killPod(fakePodPending, testutils.Mktime(later15m))
k8sutils.SetAnnotation(newPod, podtaskexecutor.LabelKeyKilledFromPendingTimeout, "1")
return newPod
}()

// Pod that is Succeeded.
fakePodFinished = func() *corev1.Pod {
newPod := fakePodResult.DeepCopy()
newPod.Status = corev1.PodStatus{
Phase: corev1.PodSucceeded,
StartTime: testutils.Mkmtimep(startTime),
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "container",
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
StartedAt: testutils.Mkmtime(startTime),
FinishedAt: testutils.Mkmtime(finishTime),
},
},
},
},
}
return newPod
}()
)

// generateJobStatusFromPod returns a new Job whose status is reconciled from
// the Pod.
//
// This method is basically identical to what is used in the controller, and is
// meant to reduce flaky tests by coupling any changes to internal logic to the
// fixtures themselves, thus making it suitable for integration tests.
func generateJobStatusFromPod(rj *execution.Job, pod *corev1.Pod) *execution.Job {
newJob := job.UpdateJobTaskRefs(rj, []tasks.Task{podtaskexecutor.NewPodTask(pod, nil)})
return jobcontroller.UpdateJobStatusFromTaskRefs(newJob)
}

// killPod returns a new Pod after setting the kill timestamp.
func killPod(pod *corev1.Pod, ts time.Time) *corev1.Pod {
newPod := pod.DeepCopy()
k8sutils.SetAnnotation(newPod, podtaskexecutor.LabelKeyTaskKillTimestamp, strconv.Itoa(int(ts.Unix())))
newPod.Spec.ActiveDeadlineSeconds = pointer.Int64(int64(ts.Sub(newPod.Status.StartTime.Time).Seconds()))
return newPod
}
Loading