Skip to content

Commit

Permalink
[test][integration] Relax CheckLatestEvent to CheckEventRecordedFor.
Browse files Browse the repository at this point in the history
Instead of expecting that an event is recorded last, check that the
event was recorded for a specific object.
  • Loading branch information
trasc committed Sep 30, 2024
1 parent f1fe353 commit 1fbe011
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 19 deletions.
20 changes: 20 additions & 0 deletions pkg/util/testing/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -69,6 +70,25 @@ func CheckLatestEvent(ctx context.Context, k8sClient client.Client,
return false, fmt.Errorf("mismatch with the latest event: got r:%v t:%v n:%v, reg %v", item.Reason, item.Type, item.Note, item.Regarding)
}

// CheckEventRecordedFor checks if an event identified by eventReason, eventType, eventNote
// was recorded for the object indentified by ref.
func CheckEventRecordedFor(ctx context.Context, k8sClient client.Client,
eventReason string, eventType string, eventMessage string,
ref types.NamespacedName) (bool, error) {
events := &corev1.EventList{}
if err := k8sClient.List(ctx, events, client.InNamespace(ref.Namespace)); err != nil {
return false, err
}

for i := range events.Items {
item := &events.Items[i]
if item.InvolvedObject.Name == ref.Name && item.Reason == eventReason && item.Type == eventType && item.Message == eventMessage {
return true, nil
}
}
return false, fmt.Errorf("event not found fater checking %d events", len(events.Items))
}

// HasEventAppeared returns if an event has been emitted
func HasEventAppeared(ctx context.Context, k8sClient client.Client, event corev1.Event) (bool, error) {
events := &corev1.EventList{}
Expand Down
12 changes: 4 additions & 8 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,13 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
gomega.Expect(ctrl.SetControllerReference(createdJob, secondWl, scheme.Scheme)).Should(gomega.Succeed())
secondWl.Spec.PodSets[0].Count += 1
gomega.Expect(k8sClient.Create(ctx, secondWl)).Should(gomega.Succeed())
gomega.Eventually(func() error {
wl := &kueue.Workload{}
key := types.NamespacedName{Name: secondWl.Name, Namespace: secondWl.Namespace}
return k8sClient.Get(ctx, key, wl)
}, util.Timeout, util.Interval).Should(testing.BeNotFoundError())
util.ExpectObjectToBeDeleted(ctx, k8sClient, secondWl, false)
// check the original wl is still there
gomega.Consistently(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", workload.Key(secondWl)))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", workload.Key(secondWl)), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

Expand Down Expand Up @@ -209,7 +205,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
return !*createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector).Should(gomega.HaveLen(1))
Expand Down Expand Up @@ -239,7 +235,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
len(createdJob.Spec.Template.Spec.NodeSelector) == 0
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ var _ = ginkgo.Describe("JobSet controller", ginkgo.Ordered, ginkgo.ContinueOnFa
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, lookupKey, createdJobSet)).Should(gomega.Succeed())
g.Expect(ptr.Deref(createdJobSet.Spec.Suspend, false)).Should(gomega.BeFalse())
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name), lookupKey)
g.Expect(ok).Should(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Expect(createdJobSet.Spec.ReplicatedJobs[0].Template.Spec.Template.Spec.NodeSelector).Should(gomega.Equal(map[string]string{instanceKey: onDemandFlavor.Name}))
Expand All @@ -215,7 +215,7 @@ var _ = ginkgo.Describe("JobSet controller", ginkgo.Ordered, ginkgo.ContinueOnFa
g.Expect(jobSet.Spec.ReplicatedJobs[0].Template.Spec.Template.Spec.NodeSelector).Should(gomega.BeEmpty())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()), lookupKey)
g.Expect(ok).Should(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

Expand Down
4 changes: 2 additions & 2 deletions test/integration/controller/jobs/kubeflow/kubeflowjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func ShouldReconcileJob(ctx context.Context, k8sClient client.Client, job, creat
return !createdJob.IsSuspended()
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
for _, psr := range podSetsResources {
Expand All @@ -167,7 +167,7 @@ func ShouldReconcileJob(ctx context.Context, k8sClient client.Client, job, creat
len(createdJob.KFJobControl.ReplicaSpecs()[ReplicaTypeWorker].Template.Spec.NodeSelector) == 0
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
return !*createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Expect(createdJob.Spec.MPIReplicaSpecs[kfmpi.MPIReplicaTypeLauncher].Template.Spec.NodeSelector).Should(gomega.HaveLen(1))
Expand All @@ -210,7 +210,8 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
len(createdJob.Spec.MPIReplicaSpecs[kfmpi.MPIReplicaTypeWorker].Template.Spec.NodeSelector) == 0
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()), lookupKey)
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload)

gomega.Eventually(func(g gomega.Gomega) bool {
ok, err := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name))
ok, err := testing.CheckEventRecordedFor(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name), lookupKey)
g.Expect(err).NotTo(gomega.HaveOccurred())
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.Continue
},
},
).Obj()
//timeBeforeAdmission := time.Now()
gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed())
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload)

Expand All @@ -187,7 +188,7 @@ var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.Continue
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Expect(createdJob.Spec.HeadGroupSpec.Template.Spec.NodeSelector).Should(gomega.HaveLen(1))
Expand All @@ -213,7 +214,7 @@ var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.Continue
return *createdJob.Spec.Suspend && len(createdJob.Spec.WorkerGroupSpecs[0].Template.Spec.NodeSelector) == 0
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
return !createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Expect(createdJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.NodeSelector).Should(gomega.HaveLen(1))
Expand All @@ -221,7 +221,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
return createdJob.Spec.Suspend && len(createdJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.NodeSelector) == 0
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()))
ok, _ := testing.CheckEventRecordedFor(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String()), lookupKey)
return ok
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

Expand Down

0 comments on commit 1fbe011

Please sign in to comment.