Skip to content

Failed batches processing with dead-letter queue #1713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3c875e3
Batch branch recovery
lapaniku Dec 3, 2020
763b61f
DL test
lapaniku Dec 4, 2020
6234111
Cortex conf
lapaniku Dec 4, 2020
043ff54
Batch.py fix
lapaniku Dec 4, 2020
8c388e2
Batch.py fix 2
lapaniku Dec 4, 2020
76e2955
Revert batch.py
lapaniku Dec 4, 2020
8f322ff
SQS region
lapaniku Dec 4, 2020
f9d6a37
Disable job clean up cron
vishalbollu Dec 7, 2020
307d547
Set default queue visibility timeout to 60 seconds
vishalbollu Dec 7, 2020
a766aed
Batch.py renewal update
lapaniku Dec 9, 2020
0ad5fab
Merge branch 'failed_batches' of https://github.com/lapaniku/cortex i…
lapaniku Dec 9, 2020
cb4f057
Merge branch 'master' into failed_batches
vishalbollu Dec 14, 2020
2d2a0bc
Add message renewal and on job complete message retries
vishalbollu Dec 14, 2020
383d583
Failed predict test
lapaniku Dec 14, 2020
0751891
Fix sqs client reference
lapaniku Dec 15, 2020
f9fb08f
Update batch.py
vishalbollu Dec 15, 2020
beb1844
Cast uuid to str
vishalbollu Dec 15, 2020
2b05878
Update batch.py
vishalbollu Dec 15, 2020
d5724ef
Update batch.py
vishalbollu Dec 15, 2020
252f794
Add DLQ to job submission and retry batches on exception in Python
vishalbollu Dec 18, 2020
242b811
Merge branch 'master' into failed_batches
vishalbollu Dec 18, 2020
69a01c3
Clean up PR
vishalbollu Dec 18, 2020
91b8cda
Remove print statements
vishalbollu Dec 20, 2020
391b0a6
Merge branch 'master' into failed_batches
vishalbollu Dec 21, 2020
c16f502
Update job status calculation criteria
vishalbollu Dec 21, 2020
5141f29
Remove testing examples
vishalbollu Dec 21, 2020
7523aa2
Cleanup PR
vishalbollu Dec 21, 2020
bc51679
Respond to PR comments
vishalbollu Dec 22, 2020
d1f41ff
Merge branch 'master' into failed_batches
vishalbollu Dec 22, 2020
ddab204
Merge branch 'master' into failed_batches
vishalbollu Dec 22, 2020
44461bf
Update batch.py
vishalbollu Dec 22, 2020
b456fa6
Merge branch 'master' into failed_batches
vishalbollu Dec 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/cmd/lib_batch_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func batchAPITable(batchAPI schema.APIResponse) string {
{Title: "job id"},
{Title: "status"},
{Title: "progress"}, // (succeeded/total)
{Title: "failed", Hidden: totalFailed == 0},
{Title: "failed attempts", Hidden: totalFailed == 0},
{Title: "start time"},
{Title: "duration"},
},
Expand Down Expand Up @@ -200,7 +200,7 @@ func getJob(env cliconfig.Environment, apiName string, jobID string) (string, er
Headers: []table.Header{
{Title: "total"},
{Title: "succeeded"},
{Title: "failed"},
{Title: "failed attempts"},
{Title: "avg time per batch"},
},
Rows: [][]interface{}{
Expand Down
28 changes: 26 additions & 2 deletions docs/workloads/batch/endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ POST <batch_api_endpoint>:
{
"workers": <int>, # the number of workers to allocate for this job (required)
"timeout": <int>, # duration in seconds since the submission of a job before it is terminated (optional)
"sqs_dead_letter_queue": { # specify a queue to redirect failed batches (optional)
"arn": <string>, # arn of dead letter queue e.g. arn:aws:sqs:us-west-2:123456789:failed.fifo
"max_receive_count": <int> # number of a times a batch is allowed to be handled by a worker before it is considered to be failed and transferred to the dead letter queue (must be >= 1)
},
"item_list": {
"items": [ # a list items that can be of any type (required)
<any>,
Expand All @@ -54,6 +58,10 @@ RESPONSE:
"api_id": <string>,
"sqs_url": <string>,
"timeout": <int>,
"sqs_dead_letter_queue": {
"arn": <string>,
"max_receive_count": <int>
},
"created_time": <string> # e.g. 2020-07-16T14:56:10.276007415Z
}
```
Expand All @@ -76,6 +84,10 @@ POST <batch_api_endpoint>:
{
"workers": <int>, # the number of workers to allocate for this job (required)
"timeout": <int>, # duration in seconds since the submission of a job before it is terminated (optional)
"sqs_dead_letter_queue": { # specify a queue to redirect failed batches (optional)
"arn": <string>, # arn of dead letter queue e.g. arn:aws:sqs:us-west-2:123456789:failed.fifo
"max_receive_count": <int> # number of a times a batch is allowed to be handled by a worker before it is considered to be failed and transferred to the dead letter queue (must be >= 1)
},
"file_path_lister": {
"s3_paths": [<string>], # can be S3 prefixes or complete S3 paths (required)
"includes": [<string>], # glob patterns (optional)
Expand All @@ -96,6 +108,10 @@ RESPONSE:
"api_id": <string>,
"sqs_url": <string>,
"timeout": <int>,
"sqs_dead_letter_queue": {
"arn": <string>,
"max_receive_count": <int>
},
"created_time": <string> # e.g. 2020-07-16T14:56:10.276007415Z
}
```
Expand All @@ -117,6 +133,10 @@ POST <batch_api_endpoint>:
{
"workers": <int>, # the number of workers to allocate for this job (required)
"timeout": <int>, # duration in seconds since the submission of a job before it is terminated (optional)
"sqs_dead_letter_queue": { # specify a queue to redirect failed batches (optional)
"arn": <string>, # arn of dead letter queue e.g. arn:aws:sqs:us-west-2:123456789:failed.fifo
"max_receive_count": <int> # number of a times a batch is allowed to be handled by a worker before it is considered to be failed and transferred to the dead letter queue (must be >= 1)
},
"delimited_files": {
"s3_paths": [<string>], # can be S3 prefixes or complete S3 paths (required)
"includes": [<string>], # glob patterns (optional)
Expand All @@ -137,6 +157,10 @@ RESPONSE:
"api_id": <string>,
"sqs_url": <string>,
"timeout": <int>,
"sqs_dead_letter_queue": {
"arn": <string>,
"max_receive_count": <int>
},
"created_time": <string> # e.g. 2020-07-16T14:56:10.276007415Z
}
```
Expand All @@ -163,8 +187,8 @@ RESPONSE:
"batches_in_queue": <int> # number of batches remaining in the queue
"batch_metrics": {
"succeeded": <int> # number of succeeded batches
"failed": int # number of failed batches
"avg_time_per_batch": <float> (optional) # only available if batches have been completed
"failed": int # number of failed attempts
"avg_time_per_batch": <float> (optional) # average time spent working on a batch (only considers successful attempts)
},
"worker_counts": { # worker counts are only available while a job is running
"pending": <int>, # number of workers that are waiting for compute resources to be provisioned
Expand Down
3 changes: 2 additions & 1 deletion docs/workloads/batch/example.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ Deploy batch APIs that can orchestrate distributed batch inference jobs on large
## Key features

* Distributed inference
* Fault tolerance with queues
* Automatic batch retries
* Collect failed batches for debugging
* Metrics and log aggregation
* `on_job_complete` webhook
* Scale to 0
Expand Down
226 changes: 146 additions & 80 deletions pkg/cortex/serve/start/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import threading
import math
import pathlib
import uuid

import boto3
import botocore
Expand All @@ -34,7 +35,11 @@
from cortex_internal.lib.exceptions import UserRuntimeException

API_LIVENESS_UPDATE_PERIOD = 5 # seconds
MAXIMUM_MESSAGE_VISIBILITY = 60 * 60 * 12 # 12 hours is the maximum message visibility
SQS_POLL_WAIT_TIME = 10 # seconds
MESSAGE_NOT_FOUND_SLEEP = 10 # seconds
INITIAL_MESSAGE_VISIBILITY = 30 # seconds
MESSAGE_RENEWAL_PERIOD = 15 # seconds
JOB_COMPLETE_MESSAGE_RENEWAL = 10 # seconds

local_cache = {
"api_spec": None,
Expand All @@ -48,6 +53,10 @@
}


receipt_handle_mutex = threading.Lock()
stop_renewal = set()


def dimensions():
return [
{"Name": "APIName", "Value": local_cache["api_spec"].name},
Expand All @@ -67,6 +76,40 @@ def time_per_batch_metric(total_time_seconds):
return {"MetricName": "TimePerBatch", "Dimensions": dimensions(), "Value": total_time_seconds}


def renew_message_visibility(receipt_handle: str):
queue_url = local_cache["job_spec"]["sqs_url"]
interval = MESSAGE_RENEWAL_PERIOD
new_timeout = INITIAL_MESSAGE_VISIBILITY
cur_time = time.time()

while True:
time.sleep((cur_time + interval) - time.time())
cur_time += interval
new_timeout += interval

with receipt_handle_mutex:
if receipt_handle in stop_renewal:
stop_renewal.remove(receipt_handle)
break

try:
local_cache["sqs_client"].change_message_visibility(
QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=new_timeout
)
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "InvalidParameterValue":
# unexpected; this error is thrown when attempting to renew a message that has been deleted
continue
elif e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
# there may be a delay between the cron may deleting the queue and this worker stopping
cx_logger().info(
"failed to renew message visibility because the queue was not found"
)
else:
stop_renewal.remove(receipt_handle)
raise e


def build_predict_args(payload, batch_id):
args = {}

Expand Down Expand Up @@ -102,49 +145,6 @@ def get_total_messages_in_queue():
return visible_count, not_visible_count


def handle_on_complete(message):
job_spec = local_cache["job_spec"]
predictor_impl = local_cache["predictor_impl"]
sqs_client = local_cache["sqs_client"]
queue_url = job_spec["sqs_url"]
receipt_handle = message["ReceiptHandle"]

try:
if not getattr(predictor_impl, "on_job_complete", None):
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
return True

should_run_on_job_complete = False

while True:
visible_count, not_visible_count = get_total_messages_in_queue()

# if there are other messages that are visible, release this message and get the other ones (should rarely happen for FIFO)
if visible_count > 0:
sqs_client.change_message_visibility(
QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=0
)
return False

if should_run_on_job_complete:
# double check that the queue is still empty (except for the job_complete message)
if not_visible_count <= 1:
logger().info("executing on_job_complete")
predictor_impl.on_job_complete()
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
return True
else:
should_run_on_job_complete = False

if not_visible_count <= 1:
should_run_on_job_complete = True

time.sleep(20)
except:
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
raise


def sqs_loop():
job_spec = local_cache["job_spec"]
api_spec = local_cache["api_spec"]
Expand All @@ -159,52 +159,118 @@ def sqs_loop():
response = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=10,
VisibilityTimeout=MAXIMUM_MESSAGE_VISIBILITY,
WaitTimeSeconds=SQS_POLL_WAIT_TIME,
VisibilityTimeout=INITIAL_MESSAGE_VISIBILITY,
MessageAttributeNames=["All"],
)

if response.get("Messages") is None or len(response["Messages"]) == 0:
if no_messages_found_in_previous_iteration:
logger().info("no batches left in queue, exiting...")
return
else:
visible_messages, invisible_messages = get_total_messages_in_queue()
if visible_messages + invisible_messages == 0:
if no_messages_found_in_previous_iteration:
logger().info("no batches left in queue, exiting...")
return
no_messages_found_in_previous_iteration = True
continue
else:
no_messages_found_in_previous_iteration = False

message = response["Messages"][0]
time.sleep(MESSAGE_NOT_FOUND_SLEEP)
continue

no_messages_found_in_previous_iteration = False
message = response["Messages"][0]
receipt_handle = message["ReceiptHandle"]

if "MessageAttributes" in message and "job_complete" in message["MessageAttributes"]:
handled_on_complete = handle_on_complete(message)
if handled_on_complete:
logger().info("no batches left in queue, job has been completed")
return
renewer = threading.Thread(
target=renew_message_visibility, args=(receipt_handle,), daemon=True
)
renewer.start()

if is_on_job_complete(message):
handle_on_job_complete(message)
else:
handle_batch_message(message)


def is_on_job_complete(message) -> bool:
return "MessageAttributes" in message and "job_complete" in message["MessageAttributes"]


def handle_batch_message(message):
job_spec = local_cache["job_spec"]
predictor_impl = local_cache["predictor_impl"]
sqs_client = local_cache["sqs_client"]
queue_url = job_spec["sqs_url"]
receipt_handle = message["ReceiptHandle"]
api_spec = local_cache["api_spec"]

start_time = time.time()

try:
logger().info(f"processing batch {message['MessageId']}")
payload = json.loads(message["Body"])
batch_id = message["MessageId"]
predictor_impl.predict(**build_predict_args(payload, batch_id))

api_spec.post_metrics(
[success_counter_metric(), time_per_batch_metric(time.time() - start_time)]
)
except:
api_spec.post_metrics([failed_counter_metric()])
logger().exception(f"failed processing batch {message['MessageId']}")
with receipt_handle_mutex:
stop_renewal.add(receipt_handle)
if job_spec.get("sqs_dead_letter_queue") is not None:
sqs_client.change_message_visibility( # return message
QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=0
)
else:
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
else:
with receipt_handle_mutex:
stop_renewal.add(receipt_handle)
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)


def handle_on_job_complete(message):
job_spec = local_cache["job_spec"]
predictor_impl = local_cache["predictor_impl"]
sqs_client = local_cache["sqs_client"]
queue_url = job_spec["sqs_url"]
receipt_handle = message["ReceiptHandle"]

should_run_on_job_complete = False
try:
while True:
visible_messages, invisible_messages = get_total_messages_in_queue()
total_messages = visible_messages + invisible_messages
if total_messages > 1:
new_message_id = uuid.uuid4()
time.sleep(JOB_COMPLETE_MESSAGE_RENEWAL)
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody='"job_complete"',
MessageAttributes={
"job_complete": {"StringValue": "true", "DataType": "String"},
"api_name": {"StringValue": job_spec["api_name"], "DataType": "String"},
"job_id": {"StringValue": job_spec["job_id"], "DataType": "String"},
},
MessageDeduplicationId=str(new_message_id),
MessageGroupId=str(new_message_id),
)
break
else:
# sometimes on_job_complete message will be released if there are other messages still to be processed
continue

try:
logger().info(f"processing batch {message['MessageId']}")

start_time = time.time()

payload = json.loads(message["Body"])
batch_id = message["MessageId"]
predictor_impl.predict(**build_predict_args(payload, batch_id))

api_spec.post_metrics(
[success_counter_metric(), time_per_batch_metric(time.time() - start_time)]
)
except Exception:
api_spec.post_metrics(
[failed_counter_metric(), time_per_batch_metric(time.time() - start_time)]
)
logger().exception("failed to process batch")
finally:
if should_run_on_job_complete:
if getattr(predictor_impl, "on_job_complete", None):
logger().info("executing on_job_complete")
predictor_impl.on_job_complete()
break
should_run_on_job_complete = True
time.sleep(10) # verify that the queue is empty one more time
except:
logger.exception("failed to handle on_job_complete")
raise
finally:
with receipt_handle_mutex:
stop_renewal.add(receipt_handle)
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)


Expand Down
Loading