Skip to content

Commit

Permalink
Remove failed jobs in Ray
Browse files Browse the repository at this point in the history
  • Loading branch information
javiermtorres committed Feb 21, 2025
1 parent 6388749 commit 96b3c55
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
5 changes: 3 additions & 2 deletions lumigator/backend/backend/services/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def retrieve_job_logs(self, job_id: UUID) -> JobLogsResponse:
except json.JSONDecodeError as e:
raise JobUpstreamError("ray", f"JSON decode error from {resp.text or ''}") from e

async def wait_for_job_complete(self, job_id, max_wait_time_sec=None):
async def wait_for_job_complete(self, job_id, max_wait_time_sec):
"""Waits for a job to complete, or until a maximum wait time is reached.
:param job_id: The ID of the job to wait for.
Expand All @@ -306,7 +306,7 @@ async def wait_for_job_complete(self, job_id, max_wait_time_sec=None):
# Wait for the job to complete
elapsed_time = 0
while job_status not in self.TERMINAL_STATUS:
if max_wait_time_sec and elapsed_time >= max_wait_time_sec:
if elapsed_time >= max_wait_time_sec:
loguru.logger.info(f"Job {job_id} did not complete within the maximum wait time.")
break
await asyncio.sleep(5)
Expand Down Expand Up @@ -560,6 +560,7 @@ def create_job(
# - annotation jobs do not run in workflows => they trigger dataset saving here at job level
# As JobType.ANNOTATION is not used uniformly throughout our code yet, we rely on the already
# existing `store_to_dataset` parameter to explicitly trigger this in the annotation case
# FIXME add timeout to job spec too (and override at workflow?)
if job_type == JobType.INFERENCE and request.job_config.store_to_dataset:
self.add_background_task(self._background_tasks, self.handle_inference_job, record.id, request)

Expand Down
16 changes: 10 additions & 6 deletions lumigator/backend/backend/services/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def __init__(
# TODO: rely on https://github.com/ray-project/ray/blob/7c2a200ef84f17418666dad43017a82f782596a3/python/ray/dashboard/modules/job/common.py#L53
self.TERMINAL_STATUS = [JobStatus.FAILED.value, JobStatus.SUCCEEDED.value]

def _stop_job(job_id: UUID):
# Maybe move to the job service?
def _stop_job(self, job_id: UUID):
resp = requests.post(urljoin(settings.RAY_JOBS_URL, f"{job_id}/stop"), timeout=5) # 5 seconds
if resp.status_code == HTTPStatus.NOT_FOUND:
raise JobUpstreamError("ray", "job_id not found when retrieving logs") from None
Expand All @@ -69,11 +70,6 @@ def _stop_job(job_id: UUID):
"ray",
f"Unexpected status code getting job logs: {resp.status_code}, error: {resp.text or ''}",
) from None
try:
metadata = json.loads(resp.text)
return JobLogsResponse(**metadata)
except json.JSONDecodeError as e:
raise JobUpstreamError("ray", f"JSON decode error from {resp.text or ''}") from e

async def _run_inference_eval_pipeline(
self,
Expand Down Expand Up @@ -111,6 +107,10 @@ async def _run_inference_eval_pipeline(
status = await self._job_service.wait_for_job_complete(inference_job.id, max_wait_time_sec=request.job_timeout)
if status != JobStatus.SUCCEEDED:
loguru.logger.error(f"Inference job {inference_job.id} failed")
try:
self._stop_job(inference_job.id)
except JobUpstreamError:
loguru.logger.error(f"Failed to stop infer job {inference_job.id}, continuing")
self._tracking_client.update_workflow_status(workflow.id, WorkflowStatus.FAILED)
raise Exception(f"Inference job {inference_job.id} failed")

Expand Down Expand Up @@ -151,6 +151,10 @@ async def _run_inference_eval_pipeline(
self._job_service._validate_results(evaluation_job.id, self._dataset_service.s3_filesystem)
if status != JobStatus.SUCCEEDED:
loguru.logger.error(f"Evaluation job {evaluation_job.id} failed")
try:
self._stop_job(evaluation_job.id)
except JobUpstreamError:
loguru.logger.error(f"Failed to stop eval job {evaluation_job.id}, continuing")
self._tracking_client.update_workflow_status(workflow.id, WorkflowStatus.FAILED)
try:
loguru.logger.info("Handling evaluation result")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def run_workflow(local_client: TestClient, dataset_id, experiment_id, workflow_n
"dataset": str(dataset_id),
"experiment_id": experiment_id,
"max_samples": 1,
"job_timeout": 1,
},
).json()
)
Expand Down

0 comments on commit 96b3c55

Please sign in to comment.