Skip to content

Commit

Permalink
Update error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bgklein committed Sep 5, 2018
1 parent 70e39ec commit dca1f06
Show file tree
Hide file tree
Showing 6 changed files with 974 additions and 950 deletions.
Empty file added azure-batch/__init__.py
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from azure.batch.models import BatchErrorException

class CreateTasksErrorException(BatchErrorException):
"""

class CreateTasksErrorException(Exception):
""" Aggregate Exception containing details for any failures from a task add operation.
:param str message: Error message describing exit reason
:param [~TaskAddResult] failures: List of tasks with detected client side errors.
:param [~TaskAddParameter] pending_task_list: List of tasks remaining to be submitted.
:param [~TaskAddResult] failure_tasks: List of tasks which failed to add
:param [~Exception] errors: List of unknown errors forcing early termination
"""
def __init__(self, message, failures, pending_task_list):
def __init__(self, message, pending_task_list=None, failure_tasks=None, errors=None):
self.message = message
self.failures = list(failures)
self.pending_tasks = list(pending_task_list)
self.failure_tasks = list(failure_tasks)
self.errors = list(errors)
71 changes: 36 additions & 35 deletions azure-batch/azure/batch/custom/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,21 @@
import threading
import types

from . import errors
from .. import models
from ..models import BatchErrorException, TaskAddCollectionResult, TaskAddCollectionParameter, TaskAddStatus
from ..models import BatchErrorException, TaskAddCollectionResult, TaskAddStatus, CreateTasksErrorException
from ..operations.task_operations import TaskOperations

from msrest import Serializer, Deserializer
from msrest.pipeline import ClientRawResponse

MAX_TASKS_PER_REQUEST = 100
_LOGGER = logging.getLogger(__name__)

class _TaskWorkflowManager(object):
"""Worker class for one add_collection request
:param ExtendedTaskOperations task_operations: Parent object which instantiated this
:param ~TaskOperations task_operations: Parent object which instantiated this
:param job_id: The ID of the job to which the task collection is to be
added.
:type job_id: str
:param value: The collection of tasks to add.
:type value: list of :class:`TaskAddParameter
:param tasks_to_add: The collection of tasks to add.
:type tasks_to_add: list of :class:`TaskAddParameter
<azure.batch.models.TaskAddParameter>`
:param task_add_collection_options: Additional parameters for the
operation
Expand All @@ -42,12 +37,11 @@ def __init__(
custom_headers=None,
raw=False,
**kwargs):
# No complex operations - No lock needed
self._has_early_termination_error = False

# Append operations thread safe
# Only read once all threads have completed
self._failures = collections.deque()
# Append operations thread safe - Only read once all threads have completed
# List of tasks which failed to add due to a returned client error
self._failure_tasks = collections.deque()
# List of unknown exceptions which occurred during requests.
self._errors = collections.deque()

# synchronized through lock variables
self.error = None # Only written once all threads have completed
Expand All @@ -68,7 +62,10 @@ def __init__(
self._kwargs = dict(**kwargs)

def _bulk_add_tasks(self, results_queue, chunk_tasks_to_add):
"""Adds a chunk of tasks to the job and retry if chunk
"""Adds a chunk of tasks to the job
Retry chunk if body exceeds the maximum request size and retry tasks
if failed due to server errors.
:param results_queue: Queue to place the return value of the request
:type results_queue: collections.deque
Expand All @@ -94,10 +91,9 @@ def _bulk_add_tasks(self, results_queue, chunk_tasks_to_add):
# 3) Sum of all cells exceeds max row limit
if len(chunk_tasks_to_add) == 1:
failed_task = chunk_tasks_to_add.pop()
results_queue.appendleft(e)
_LOGGER.error("Task ID %s failed to add due to exceeding the request"
" body being too large", failed_task.id)
self._has_early_termination_error = True
self._errors.appendleft(e)
_LOGGER.error("Failed to add task with ID %s due to the body"
" exceeding the maximum request size", failed_task.id)
else:
# Assumption: Tasks are relatively close in size therefore if one batch exceeds size limit
# we should decrease the initial task collection size to avoid repeating the error
Expand All @@ -118,12 +114,19 @@ def _bulk_add_tasks(self, results_queue, chunk_tasks_to_add):
# appends extra tasks to queue to be picked up by another thread .
self._tasks_to_add.extendleft(chunk_tasks_to_add[midpoint:])
self._bulk_add_tasks(results_queue, chunk_tasks_to_add[:midpoint])
# Retry server side errors
elif 500 <= e.response.status_code <= 599:
self._tasks_to_add.extendleft(chunk_tasks_to_add)
else:
results_queue.appendleft(e)
# Re-add to pending queue as unknown status / don't have result
self._tasks_to_add.extendleft(chunk_tasks_to_add)
# Unknown State - don't know if tasks failed to add or were successful
self._errors.appendleft(e)
except Exception as e: # pylint: disable=broad-except
results_queue.appendleft(e)
# Re-add to pending queue as unknown status / don't have result
self._tasks_to_add.extendleft(chunk_tasks_to_add)
# Unknown State - don't know if tasks failed to add or were successful
self._errors.appendleft(e)
else:
try:
add_collection_response = add_collection_response.output
Expand All @@ -140,7 +143,7 @@ def _bulk_add_tasks(self, results_queue, chunk_tasks_to_add):
elif (task_result.status == TaskAddStatus.client_error
and not task_result.error.code == "TaskExists"):
# Client error will be recorded unless Task already exists
self._failures.appendleft(task_result)
self._failure_tasks.appendleft(task_result)
else:
results_queue.appendleft(task_result)

Expand All @@ -151,7 +154,8 @@ def task_collection_thread_handler(self, results_queue):
:param collections.deque results_queue: Queue for worker to output results to
"""
while self._tasks_to_add and not self._has_early_termination_error:
# Add tasks until either we run out or we run into an unexpected error
while self._tasks_to_add and not self._errors:
max_tasks = self._max_tasks_per_request # local copy
chunk_tasks_to_add = []
with self._pending_queue_lock:
Expand All @@ -161,20 +165,20 @@ def task_collection_thread_handler(self, results_queue):
if chunk_tasks_to_add:
self._bulk_add_tasks(results_queue, chunk_tasks_to_add)


# Only define error if all threads have finished and there were failures
with self._error_lock:
if threading.active_count() == 1 and self._failures:
self.error = errors.CreateTasksErrorException(
if threading.active_count() == 1 and (self._failure_tasks or self._errors):
self.error = CreateTasksErrorException(
"One or more tasks failed to be added",
self._failures,
self._tasks_to_add)
self._failure_tasks,
self._tasks_to_add,
self._errors)


def _handle_output(results_queue):
"""Scan output for exceptions
If there is an output from an add task collection call add it to the results or throw the respective exception.
If there is an output from an add task collection call add it to the results.
:param results_queue: Queue containing results of attempted add_collection's
:type results_queue: collections.deque
Expand All @@ -184,10 +188,7 @@ def _handle_output(results_queue):
results = []
while results_queue:
queue_item = results_queue.pop()
if isinstance(queue_item, Exception):
raise queue_item
else:
results.append(queue_item)
results.append(queue_item)
return results

def patch_client(client):
Expand Down Expand Up @@ -279,9 +280,9 @@ def bulk_add_collection(
else:
task_workflow_manager.task_collection_thread_handler(results_queue)

submitted_tasks = _handle_output(results_queue)
if task_workflow_manager.error:
raise task_workflow_manager.error # pylint: disable=raising-bad-type
else:
submitted_tasks = _handle_output(results_queue)
return TaskAddCollectionResult(value=submitted_tasks)
bulk_add_collection.metadata = {'url': '/jobs/{jobId}/addtaskcollection'}
5 changes: 5 additions & 0 deletions azure-batch/azure/batch/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
# regenerated.
# --------------------------------------------------------------------------

######DO NOT REMOVE######
from ..custom.custom_errors import CreateTasksErrorException
######DO NOT REMOVE######

try:
from .pool_usage_metrics_py3 import PoolUsageMetrics
from .image_reference_py3 import ImageReference
Expand Down Expand Up @@ -467,6 +471,7 @@
)

__all__ = [
'CreateTasksErrorException',
'PoolUsageMetrics',
'ImageReference',
'NodeAgentSku',
Expand Down
Loading

0 comments on commit dca1f06

Please sign in to comment.