Skip to content

Commit 1eb2a9b

Browse files
committed
[RayJob] Lift cluster status while initializing
Signed-off-by: Spencer Peterson <spencerjp@google.com>
1 parent 79bd749 commit 1eb2a9b

File tree

3 files changed

+80
-16
lines changed

3 files changed

+80
-16
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
152152
rayJobInstance.Status.Reason = rayv1.ValidationFailed
153153
rayJobInstance.Status.Message = err.Error()
154154

155-
// This is the only 2 places where we update the RayJob status. This will directly
156-
// update the JobDeploymentStatus to ValidationFailed if there's validation error
155+
// This is one of the only 3 places where we update the RayJob status. This will directly
156+
// update the JobDeploymentStatus to ValidationFailed if there's validation error.
157157
if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil {
158158
logger.Info("Failed to update RayJob status", "error", err)
159159
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
@@ -204,6 +204,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
204204
if clientURL := rayJobInstance.Status.DashboardURL; clientURL == "" {
205205
if rayClusterInstance.Status.State != rayv1.Ready {
206206
logger.Info("Wait for the RayCluster.Status.State to be ready before submitting the job.", "RayCluster", rayClusterInstance.Name, "State", rayClusterInstance.Status.State)
207+
// This is one of only 3 places where we update the RayJob status. For observability
208+
// while waiting for the RayCluster to become ready, we lift the cluster status.
209+
if err := r.updateRayJobClusterStatus(ctx, originalRayJobInstance, rayClusterInstance.Status); err != nil {
210+
logger.Info("Failed to update RayJob status", "error", err)
211+
}
207212
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
208213
}
209214

@@ -419,8 +424,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
419424
}
420425
checkBackoffLimitAndUpdateStatusIfNeeded(ctx, rayJobInstance)
421426

422-
// This is the only 2 places where we update the RayJob status. Please do NOT add any code
423-
// between `checkBackoffLimitAndUpdateStatusIfNeeded` and the following code.
427+
// This is one of the only 3 places where we update the RayJob status. Please do NOT add any
428+
// code between `checkBackoffLimitAndUpdateStatusIfNeeded` and the following code.
424429
if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil {
425430
logger.Info("Failed to update RayJob status", "error", err)
426431
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
@@ -872,6 +877,16 @@ func (r *RayJobReconciler) updateRayJobStatus(ctx context.Context, oldRayJob *ra
872877
return nil
873878
}
874879

880+
// updateRayJobClusterStatus unconditionally updates only the RayClusterStatus
881+
// subfield of the RayJobStatus.
882+
func (r *RayJobReconciler) updateRayJobClusterStatus(ctx context.Context, oldRayJob *rayv1.RayJob, newStatus rayv1.RayClusterStatus) error {
883+
logger := ctrl.LoggerFrom(ctx)
884+
newRayJob := oldRayJob.DeepCopy()
885+
newRayJob.Status.RayClusterStatus = newStatus
886+
logger.Info("updateRayJobStatusClusterStatus", "oldRayClusterStatus", oldRayJob.Status.RayClusterStatus, "newRayClusterStatus", newStatus)
887+
return r.Status().Update(ctx, newRayJob)
888+
}
889+
875890
func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, rayJobInstance *rayv1.RayJob) (*rayv1.RayCluster, error) {
876891
logger := ctrl.LoggerFrom(ctx)
877892
rayClusterNamespacedName := common.RayJobRayClusterNamespacedName(rayJobInstance)

ray-operator/controllers/ray/rayjob_controller_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,34 @@ var _ = Context("RayJob with different submission modes", func() {
241241
Expect(rayCluster.Annotations).Should(Equal(rayJob.Annotations))
242242
})
243243

244+
It("In Initializing state, the JobStatus should show the RayCluster status", func() {
245+
// The RayCluster is not 'Ready' yet because Pods are not running and ready.
246+
Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready))
247+
248+
updateHeadPodToRunningNotReady(ctx, rayJob.Status.RayClusterName, namespace)
249+
250+
// Now the cluster should have nonzero conditions.
251+
Eventually(
252+
func() int {
253+
status := getClusterStatus(ctx, namespace, rayCluster.Name)()
254+
return len(status.Conditions)
255+
},
256+
time.Second*3, time.Millisecond*500).ShouldNot(Equal(0))
257+
258+
// We expect the RayJob's RayClusterStatus to eventually mirror the cluster's status.
259+
Eventually(
260+
func() (int, error) {
261+
currentRayJob := &rayv1.RayJob{}
262+
err := k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, currentRayJob)
263+
if err != nil {
264+
return 0, err
265+
}
266+
return len(currentRayJob.Status.RayClusterStatus.Conditions), nil
267+
},
268+
time.Second*3, time.Millisecond*500,
269+
).ShouldNot(Equal(0))
270+
})
271+
244272
It("Make RayCluster.Status.State to be rayv1.Ready", func() {
245273
// The RayCluster is not 'Ready' yet because Pods are not running and ready.
246274
Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready))

ray-operator/controllers/ray/suite_helpers_test.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,36 @@ func checkServeApplicationExists(ctx context.Context, rayService *rayv1.RayServi
249249
// So Pods are created, but no controller updates them from Pending to Running.
250250
// See https://book.kubebuilder.io/reference/envtest.html for more details.
251251
func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string, namespace string) {
252+
updateHeadPodToPhaseAndConditions(ctx, rayClusterName, namespace, corev1.PodRunning, []corev1.PodCondition{
253+
{
254+
Type: corev1.PodReady,
255+
Status: corev1.ConditionTrue,
256+
},
257+
})
258+
}
259+
260+
func updateHeadPodToRunningNotReady(ctx context.Context, rayClusterName string, namespace string) {
261+
updateHeadPodToPhaseAndConditions(ctx, rayClusterName, namespace, corev1.PodRunning, []corev1.PodCondition{
262+
{
263+
Type: corev1.PodScheduled,
264+
Status: corev1.ConditionTrue,
265+
},
266+
{
267+
Type: corev1.PodInitialized,
268+
Status: corev1.ConditionTrue,
269+
},
270+
{
271+
Type: corev1.PodReady,
272+
Status: corev1.ConditionFalse,
273+
},
274+
{
275+
Type: corev1.ContainersReady,
276+
Status: corev1.ConditionFalse,
277+
},
278+
})
279+
}
280+
281+
func updateHeadPodToPhaseAndConditions(ctx context.Context, rayClusterName string, namespace string, phase corev1.PodPhase, conditions []corev1.PodCondition) {
252282
var instance rayv1.RayCluster
253283
gomega.Eventually(
254284
getResourceFunc(ctx, client.ObjectKey{Name: rayClusterName, Namespace: namespace}, &instance),
@@ -262,19 +292,10 @@ func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string,
262292
time.Second*3, time.Millisecond*500).Should(gomega.Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items)
263293

264294
headPod := headPods.Items[0]
265-
headPod.Status.Phase = corev1.PodRunning
266-
headPod.Status.Conditions = []corev1.PodCondition{
267-
{
268-
Type: corev1.PodReady,
269-
Status: corev1.ConditionTrue,
270-
},
271-
}
295+
headPod.Status.Phase = phase
296+
headPod.Status.Conditions = conditions
272297
err := k8sClient.Status().Update(ctx, &headPod)
273-
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update head Pod status to PodRunning")
274-
275-
// Make sure the head Pod is updated.
276-
gomega.Eventually(
277-
isAllPodsRunningByFilters).WithContext(ctx).WithArguments(headPods, headLabels).WithTimeout(time.Second*15).WithPolling(time.Millisecond*500).Should(gomega.BeTrue(), "Head Pod should be running: %v", headPods.Items)
298+
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update head Pod status to not ready")
278299
}
279300

280301
// Update the status of the worker Pods to Running and Ready. Similar to updateHeadPodToRunningAndReady.

0 commit comments

Comments
 (0)