From 9216e9d14340b647158f3d7c3edf4a7c4cea8b76 Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 22 Dec 2020 17:05:28 -0500 Subject: [PATCH] Batch improvements --- docs/workloads/batch/endpoints.md | 2 +- pkg/cortex/serve/start/batch.py | 4 +- .../batchapi/manage_resources_cron.go | 42 +++++++------------ 3 files changed, 19 insertions(+), 29 deletions(-) diff --git a/docs/workloads/batch/endpoints.md b/docs/workloads/batch/endpoints.md index 8720faf5e3..2f16481da8 100644 --- a/docs/workloads/batch/endpoints.md +++ b/docs/workloads/batch/endpoints.md @@ -183,7 +183,7 @@ RESPONSE: "config": {: }, "api_id": , "sqs_url": , - "status": , # will be one of the following values: status_unknown|status_enqueuing|status_running|status_enqueue_failed|status_completed_with_failures|status_succeeded|status_unexpected_error|status_worker_error|status_worker_oom|status_stopped + "status": , # will be one of the following values: status_unknown|status_enqueuing|status_running|status_enqueue_failed|status_completed_with_failures|status_succeeded|status_unexpected_error|status_worker_error|status_worker_oom|status_timed_out|status_stopped "batches_in_queue": # number of batches remaining in the queue "batch_metrics": { "succeeded": # number of succeeded batches diff --git a/pkg/cortex/serve/start/batch.py b/pkg/cortex/serve/start/batch.py index 70d82fa574..c52be00100 100644 --- a/pkg/cortex/serve/start/batch.py +++ b/pkg/cortex/serve/start/batch.py @@ -102,7 +102,7 @@ def renew_message_visibility(receipt_handle: str): continue elif e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue": # there may be a delay between the cron may deleting the queue and this worker stopping - cx_logger().info( + logger().info( "failed to renew message visibility because the queue was not found" ) else: @@ -266,7 +266,7 @@ def handle_on_job_complete(message): should_run_on_job_complete = True time.sleep(10) # verify that the queue is empty one more time except: - logger.exception("failed to handle on_job_complete") + logger().exception("failed to handle on_job_complete") raise finally: with receipt_handle_mutex: diff --git a/pkg/operator/resources/batchapi/manage_resources_cron.go b/pkg/operator/resources/batchapi/manage_resources_cron.go index 55647cf4c7..51d5506a8a 100644 --- a/pkg/operator/resources/batchapi/manage_resources_cron.go +++ b/pkg/operator/resources/batchapi/manage_resources_cron.go @@ -81,7 +81,8 @@ func ManageJobResources() error { k8sJobMap := map[string]*kbatch.Job{} k8sJobIDSet := strset.Set{} - for _, job := range jobs { + for i := range jobs { + job := jobs[i] k8sJobMap[job.Labels["jobID"]] = &job k8sJobIDSet.Add(job.Labels["jobID"]) } @@ -302,32 +303,21 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job return err } - if jobSpec.Workers == int(k8sJob.Status.Succeeded) { - if jobSpec.TotalBatchCount == batchMetrics.Succeeded { - _jobsToDelete.Remove(jobKey.ID) - return errors.FirstError( - setSucceededStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) - } + if jobSpec.TotalBatchCount == batchMetrics.Succeeded { + _jobsToDelete.Remove(jobKey.ID) + return errors.FirstError( + setSucceededStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) + } - // wait one more cycle for the success metrics to reach consistency - if _jobsToDelete.Has(jobKey.ID) { - _jobsToDelete.Remove(jobKey.ID) - return errors.FirstError( - setCompletedWithFailuresStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) - } - } else { - if _jobsToDelete.Has(jobKey.ID) { - _jobsToDelete.Remove(jobKey.ID) - return errors.FirstError( - writeToJobLogStream(jobKey, "unexpected job state; queue is empty but cluster state still indicates that the job is still in progress"), - setUnexpectedErrorStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) - } + // wait one more cycle for the success metrics to reach consistency + if _jobsToDelete.Has(jobKey.ID) { + _jobsToDelete.Remove(jobKey.ID) + return errors.FirstError( + setCompletedWithFailuresStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } // It takes at least 20 seconds for a worker to exit after determining that the queue is empty.