Skip to content

Commit

Permalink
[Tables] Fix for async retry policy (Azure#17810)
Browse files Browse the repository at this point in the history
* Fix bug in async retry policy

* Added some retry testing

* Updated release notes

* Review feedback

* Moved static methods
  • Loading branch information
annatisch authored Apr 6, 2021
1 parent e3495f8 commit 824665d
Show file tree
Hide file tree
Showing 15 changed files with 1,580 additions and 82 deletions.
1 change: 1 addition & 0 deletions sdk/tables/azure-data-tables/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 12.0.0b6 (2021-04-06)
* Updated deserialization of datetime fields in entities to support preservation of the service format with additional decimal place.
* Passing a string parameter into a query filter will now be escaped to protect against injection.
* Fixed bug in incrementing retries in async retry policy

## 12.0.0b5 (2021-03-09)
* This version and all future versions will require Python 2.7 or Python 3.6+, Python 3.5 is no longer supported.
Expand Down
151 changes: 74 additions & 77 deletions sdk/tables/azure-data-tables/azure/data/tables/_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,78 @@ def is_retry(response, mode):
return True
return False

def set_next_host_location(settings, request):
"""
A function which sets the next host location on the request, if applicable.
:param ~azure.storage.models.RetryContext context:
The retry context containing the previous host location and the request
to evaluate and possibly modify.
"""
if settings["hosts"] and all(settings["hosts"].values()):
url = urlparse(request.url)
# If there's more than one possible location, retry to the alternative
if settings["mode"] == LocationMode.PRIMARY:
settings["mode"] = LocationMode.SECONDARY
else:
settings["mode"] = LocationMode.PRIMARY
updated = url._replace(netloc=settings["hosts"].get(settings["mode"]))
request.url = updated.geturl()

def increment(settings, request, response=None, error=None):
"""Increment the retry counters.
:param Any request:
:param dict settings:
:param Any response: A pipeline response object.
:param Any error: An error encountered during the request, or
None if the response was received successfully.
:keyword callable cls: A custom type or function that will be passed the direct response
:return: Whether the retry attempts are exhausted.
:rtype: None
"""
settings["total"] -= 1

if error and isinstance(error, ServiceRequestError):
# Errors when we're fairly sure that the server did not receive the
# request, so it should be safe to retry.
settings["connect"] -= 1
settings["history"].append(RequestHistory(request, error=error))

elif error and isinstance(error, ServiceResponseError):
# Errors that occur after the request has been started, so we should
# assume that the server began processing it.
settings["read"] -= 1
settings["history"].append(RequestHistory(request, error=error))

else:
# Incrementing because of a server error like a 500 in
# status_forcelist and a the given method is in the whitelist
if response:
settings["status"] -= 1
settings["history"].append(
RequestHistory(request, http_response=response)
)

if not is_exhausted(settings):
if request.method not in ["PUT"] and settings["retry_secondary"]:
set_next_host_location(settings, request)

# rewind the request body if it is a stream
if request.body and hasattr(request.body, "read"):
# no position was saved, then retry would not work
if settings["body_position"] is None:
return False
try:
# attempt to rewind the body to the initial position
request.body.seek(settings["body_position"], SEEK_SET)
except (UnsupportedOperation, ValueError):
# if body is not seekable, then retry would not work
return False
settings["count"] += 1
return True
return False


def urljoin(base_url, stub_url):
parsed = urlparse(base_url)
Expand Down Expand Up @@ -464,24 +536,6 @@ def get_backoff_time(self, settings):
random_range_end = backoff + self.random_jitter_range
return random_generator.uniform(random_range_start, random_range_end)

def _set_next_host_location(self, settings, request): # pylint: disable=no-self-use
"""
A function which sets the next host location on the request, if applicable.
:param ~azure.storage.models.RetryContext context:
The retry context containing the previous host location and the request
to evaluate and possibly modify.
"""
if settings["hosts"] and all(settings["hosts"].values()):
url = urlparse(request.url)
# If there's more than one possible location, retry to the alternative
if settings["mode"] == LocationMode.PRIMARY:
settings["mode"] = LocationMode.SECONDARY
else:
settings["mode"] = LocationMode.PRIMARY
updated = url._replace(netloc=settings["hosts"].get(settings["mode"]))
request.url = updated.geturl()

def configure_retries(
self, request
): # pylint: disable=no-self-use, arguments-differ
Expand Down Expand Up @@ -530,63 +584,6 @@ def sleep(self, settings, transport): # pylint: disable=arguments-differ
return
transport.sleep(backoff)

def increment(
self, settings, request, response=None, error=None, **kwargs
): # pylint: disable=unused-argument, arguments-differ
# type: (...)->None
"""Increment the retry counters.
:param Any request:
:param dict settings:
:param Any response: A pipeline response object.
:param Any error: An error encountered during the request, or
None if the response was received successfully.
:keyword callable cls: A custom type or function that will be passed the direct response
:return: Whether the retry attempts are exhausted.
:rtype: None
"""
settings["total"] -= 1

if error and isinstance(error, ServiceRequestError):
# Errors when we're fairly sure that the server did not receive the
# request, so it should be safe to retry.
settings["connect"] -= 1
settings["history"].append(RequestHistory(request, error=error))

elif error and isinstance(error, ServiceResponseError):
# Errors that occur after the request has been started, so we should
# assume that the server began processing it.
settings["read"] -= 1
settings["history"].append(RequestHistory(request, error=error))

else:
# Incrementing because of a server error like a 500 in
# status_forcelist and a the given method is in the whitelist
if response:
settings["status"] -= 1
settings["history"].append(
RequestHistory(request, http_response=response)
)

if not is_exhausted(settings):
if request.method not in ["PUT"] and settings["retry_secondary"]:
self._set_next_host_location(settings, request)

# rewind the request body if it is a stream
if request.body and hasattr(request.body, "read"):
# no position was saved, then retry would not work
if settings["body_position"] is None:
return False
try:
# attempt to rewind the body to the initial position
request.body.seek(settings["body_position"], SEEK_SET)
except (UnsupportedOperation, ValueError):
# if body is not seekable, then retry would not work
return False
settings["count"] += 1
return True
return False

def send(self, request):
"""
:param Any request:
Expand All @@ -599,7 +596,7 @@ def send(self, request):
try:
response = self.next.send(request)
if is_retry(response, retry_settings["mode"]):
retries_remaining = self.increment(
retries_remaining = increment(
retry_settings,
request=request.http_request,
response=response.http_response,
Expand All @@ -615,7 +612,7 @@ def send(self, request):
continue
break
except AzureError as err:
retries_remaining = self.increment(
retries_remaining = increment(
retry_settings, request=request.http_request, error=err
)
if retries_remaining:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from azure.core.pipeline.policies import AsyncHTTPPolicy, AsyncRetryPolicy
from azure.core.exceptions import AzureError

from .._policies import is_retry, TablesRetryPolicy
from .._policies import is_retry, increment, TablesRetryPolicy

if TYPE_CHECKING:
from azure.core.pipeline import PipelineRequest, PipelineResponse
Expand Down Expand Up @@ -168,8 +168,10 @@ async def send(self, request):
try:
response = await self.next.send(request)
if is_retry(response, retry_settings["mode"]):
retries_remaining = self.increment(
retry_settings, response=response.http_response
retries_remaining = increment(
retry_settings,
request=request.http_request,
response=response.http_response
)
if retries_remaining:
await retry_hook(
Expand All @@ -182,7 +184,8 @@ async def send(self, request):
continue
break
except AzureError as err:
retries_remaining = self.increment(retry_settings, error=err)
retries_remaining = increment(
retry_settings, request=request.http_request, error=err)
if retries_remaining:
await retry_hook(
retry_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,12 @@ async def create_table(
:dedent: 8
:caption: Creating a table from the TableClient object.
"""
table_properties = TableProperties(table_name=self.table_name, **kwargs)
table_properties = TableProperties(table_name=self.table_name)
try:
metadata, _ = await self._client.table.create(
table_properties,
cls=kwargs.pop("cls", _return_headers_and_deserialized),
**kwargs
)
return _trim_service_metadata(metadata)
except HttpResponseError as error:
Expand Down
27 changes: 27 additions & 0 deletions sdk/tables/azure-data-tables/tests/_shared/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,30 @@ def generate_sas_token(self):

def generate_fake_token(self):
return FakeTokenCredential()


class ResponseCallback(object):
def __init__(self, status=None, new_status=None):
self.status = status
self.new_status = new_status
self.first = True
self.count = 0

def override_first_status(self, response):
if self.first and response.http_response.status_code == self.status:
response.http_response.status_code = self.new_status
self.first = False
self.count += 1

def override_status(self, response):
if response.http_response.status_code == self.status:
response.http_response.status_code = self.new_status
self.count += 1


class RetryCounter(object):
def __init__(self):
self.count = 0

def simple_count(self, retry_context):
self.count += 1
Loading

0 comments on commit 824665d

Please sign in to comment.