Skip to content

Batch fixes #1729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/workloads/batch/endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ RESPONSE:
"config": {<string>: <any>},
"api_id": <string>,
"sqs_url": <string>,
"status": <string>, # will be one of the following values: status_unknown|status_enqueuing|status_running|status_enqueue_failed|status_completed_with_failures|status_succeeded|status_unexpected_error|status_worker_error|status_worker_oom|status_stopped
"status": <string>, # will be one of the following values: status_unknown|status_enqueuing|status_running|status_enqueue_failed|status_completed_with_failures|status_succeeded|status_unexpected_error|status_worker_error|status_worker_oom|status_timed_out|status_stopped
"batches_in_queue": <int> # number of batches remaining in the queue
"batch_metrics": {
"succeeded": <int> # number of succeeded batches
Expand Down
4 changes: 2 additions & 2 deletions pkg/cortex/serve/start/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def renew_message_visibility(receipt_handle: str):
continue
elif e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
# there may be a delay between the cron may deleting the queue and this worker stopping
cx_logger().info(
logger().info(
"failed to renew message visibility because the queue was not found"
)
else:
Expand Down Expand Up @@ -266,7 +266,7 @@ def handle_on_job_complete(message):
should_run_on_job_complete = True
time.sleep(10) # verify that the queue is empty one more time
except:
logger.exception("failed to handle on_job_complete")
logger().exception("failed to handle on_job_complete")
raise
finally:
with receipt_handle_mutex:
Expand Down
42 changes: 16 additions & 26 deletions pkg/operator/resources/batchapi/manage_resources_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func ManageJobResources() error {

k8sJobMap := map[string]*kbatch.Job{}
k8sJobIDSet := strset.Set{}
for _, job := range jobs {
for i := range jobs {
job := jobs[i]
k8sJobMap[job.Labels["jobID"]] = &job
k8sJobIDSet.Add(job.Labels["jobID"])
}
Expand Down Expand Up @@ -302,32 +303,21 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job
return err
}

if jobSpec.Workers == int(k8sJob.Status.Succeeded) {
if jobSpec.TotalBatchCount == batchMetrics.Succeeded {
_jobsToDelete.Remove(jobKey.ID)
return errors.FirstError(
setSucceededStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}
if jobSpec.TotalBatchCount == batchMetrics.Succeeded {
_jobsToDelete.Remove(jobKey.ID)
return errors.FirstError(
setSucceededStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}

// wait one more cycle for the success metrics to reach consistency
if _jobsToDelete.Has(jobKey.ID) {
_jobsToDelete.Remove(jobKey.ID)
return errors.FirstError(
setCompletedWithFailuresStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}
} else {
if _jobsToDelete.Has(jobKey.ID) {
_jobsToDelete.Remove(jobKey.ID)
return errors.FirstError(
writeToJobLogStream(jobKey, "unexpected job state; queue is empty but cluster state still indicates that the job is still in progress"),
setUnexpectedErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}
// wait one more cycle for the success metrics to reach consistency
if _jobsToDelete.Has(jobKey.ID) {
_jobsToDelete.Remove(jobKey.ID)
return errors.FirstError(
setCompletedWithFailuresStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}

// It takes at least 20 seconds for a worker to exit after determining that the queue is empty.
Expand Down