Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement retry mechanism on 429 at the http request level and for all databricks-cli operations #343

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 7 additions & 95 deletions databricks_cli/dbfs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@
import tempfile

import re
import functools
import click

from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt
from requests.exceptions import HTTPError

from databricks_cli.sdk import DbfsService
from databricks_cli.utils import error_and_quit
from databricks_cli.dbfs.dbfs_path import DbfsPath
from databricks_cli.dbfs.exceptions import LocalFileExistsException, RateLimitException
from databricks_cli.dbfs.exceptions import LocalFileExistsException

BUFFER_SIZE_BYTES = 2**20

Expand Down Expand Up @@ -77,78 +75,12 @@ class DbfsErrorCodes(object):
RESOURCE_DOES_NOT_EXIST = 'RESOURCE_DOES_NOT_EXIST'
RESOURCE_ALREADY_EXISTS = 'RESOURCE_ALREADY_EXISTS'
PARTIAL_DELETE = 'PARTIAL_DELETE'
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS'


class CustomRetryState(object):
def __init__(self):
self.time_for_last_retry = 0

def reset(self):
self.time_for_last_retry = 0


class Retry429(object):
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
MAX_SECONDS_WAIT = 60
MAX_RETRY_ATTEMPTS = 8

def __init__(self, func):
"""
If there are no decorator arguments, the function to be decorated is passed to
the constructor. It is called only once for each function decorated with it.
"""
self.retry_state_429 = CustomRetryState()

@retry(wait=wait_random_exponential(multiplier=self.EXPONENTIAL_BACKOFF_MULTIPLIER,
max=self.MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException),
stop=stop_after_attempt(self.MAX_RETRY_ATTEMPTS), reraise=True,
before_sleep=lambda retry_state: self.before_sleep_on_429(retry_state,
self.retry_state_429))
def wrapped_function(*args, **kwargs):
try:
return func(*args, **kwargs)
except HTTPError as e:
if e.response.status_code == 429:
raise RateLimitException("429 Too Many Requests")
raise e

self.func = wrapped_function

def __call__(self, *args, **kwargs):
"""
The __call__ method is called every time a decorated function is called.
"""
self.retry_state_429.reset()

return self.func(*args, **kwargs)

def __get__(self, obj, objtype):
"""
Making this decorator a descriptor such that we can use it on class methods.
See https://stackoverflow.com/a/3296318/12359607
"""
return functools.partial(self.__call__, obj)

@staticmethod
def before_sleep_on_429(retry_state, retry_state_429):
"""
Note: Here idle_for represents the total time spent sleeping in all retries so far +
the time that we will sleep until the next retry. We determined this empirically,
as it is not clearly stated in the Tenacity docs.
"""
time_until_next_retry = retry_state.idle_for - retry_state_429.time_for_last_retry
click.echo(("Received 429 REQUEST_LIMIT_EXCEEDED for attempt {}. "
"Retrying in {:.2f} seconds.").format(retry_state.attempt_number,
time_until_next_retry))
retry_state_429.time_for_last_retry = retry_state.idle_for


class DbfsApi(object):
def __init__(self, api_client):
self.client = DbfsService(api_client)

@Retry429
def list_files(self, dbfs_path, headers=None):
list_response = self.client.list(dbfs_path.absolute_path, headers=headers)
if 'files' in list_response:
Expand All @@ -169,37 +101,20 @@ def file_exists(self, dbfs_path, headers=None):
raise e
return True

@Retry429
def get_status(self, dbfs_path, headers=None):
json = self.client.get_status(dbfs_path.absolute_path, headers=headers)
return FileInfo.from_json(json)

@Retry429
def create(self, dbfs_path, overwrite, headers):
return self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)

@Retry429
def add_block(self, handle, contents, headers):
self.client.add_block(handle, contents, headers=headers)

@Retry429
def close(self, handle, headers):
self.client.close(handle, headers=headers)

def put_file(self, src_path, dbfs_path, overwrite, headers=None):
handle = self.create(dbfs_path, overwrite, headers=headers)['handle']
handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more for SDK side of things - we should consider some day later the "context manager" for DBFS files:

with self.client.open(dbfs_path.absolute_path) as dbfs_file:
    # alternatively ".write(contents)" - that will do base64 stuff
    dbfs_file.add_block(base64encode(...), ...)

with open(src_path, 'rb') as local_file:
while True:
contents = local_file.read(BUFFER_SIZE_BYTES)
if len(contents) == 0:
break
# add_block should not take a bytes object.
self.add_block(handle, b64encode(contents).decode(), headers=headers)
self.close(handle, headers=headers)

@Retry429
def read(self, dbfs_path, offset, headers):
return self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES, headers=headers)
self.client.add_block(handle, b64encode(contents).decode(), headers=headers)
self.client.close(handle, headers=headers)

def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
if os.path.exists(dst_path) and not overwrite:
Expand All @@ -211,7 +126,8 @@ def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
offset = 0
with open(dst_path, 'wb') as local_file:
while offset < length:
response = self.read(dbfs_path, offset, headers=headers)
response = self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES,
headers=headers)
bytes_read = response['bytes_read']
data = response['data']
offset += bytes_read
Expand All @@ -230,13 +146,11 @@ def get_num_files_deleted(partial_delete_error):
message))
return int(m.group(1))

@Retry429
def delete(self, dbfs_path, recursive, headers=None):
num_files_deleted = 0
while True:
try:
self.client.delete(dbfs_path.absolute_path,
recursive=recursive, headers=headers)
self.client.delete(dbfs_path.absolute_path, recursive=recursive, headers=headers)
except HTTPError as e:
if e.response.status_code == 503:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

@bogdanghita-db bogdanghita-db Oct 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you say there will be global retries on 503? I am overriding the default status codes with status_forcelist=[429] in databricks_cli/sdk/api_client.py to ensure retries are made only for 429. These are only the defaults:

#: Default status codes to be used for ``status_forcelist``
    RETRY_AFTER_STATUS_CODES = frozenset([413, 429, 503])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then good

try:
Expand All @@ -258,11 +172,9 @@ def delete(self, dbfs_path, recursive, headers=None):
break
click.echo("\rDelete finished successfully.\033[K")

@Retry429
def mkdirs(self, dbfs_path, headers=None):
self.client.mkdirs(dbfs_path.absolute_path, headers=headers)

@Retry429
def move(self, dbfs_src, dbfs_dst, headers=None):
self.client.move(dbfs_src.absolute_path, dbfs_dst.absolute_path, headers=headers)

Expand Down
4 changes: 0 additions & 4 deletions databricks_cli/dbfs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,3 @@

class LocalFileExistsException(Exception):
pass


class RateLimitException(Exception):
pass
12 changes: 11 additions & 1 deletion databricks_cli/sdk/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
try:
from requests.packages.urllib3.poolmanager import PoolManager
from requests.packages.urllib3 import exceptions
from requests.packages.urllib3.util.retry import Retry
except ImportError:
from urllib3.poolmanager import PoolManager
from urllib3 import exceptions
from urllib3.util.retry import Retry

from databricks_cli.version import version as databricks_cli_version

Expand All @@ -70,8 +72,16 @@ def __init__(self, user=None, password=None, host=None, token=None,
if host[-1] == "/":
host = host[:-1]

retries = Retry(
total=6,
backoff_factor=1,
status_forcelist=[429],
method_whitelist=set({'POST'}) | set(Retry.DEFAULT_METHOD_WHITELIST),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use DEFAULT_ALLOWED_METHODS, because DEFAULT_METHOD_WHITELIST is deprecated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new constant is not available in urllib3 v1.25.10, which seems to be installed with databricks-cli by default. It was introduced in v1.26.0. Confirmed by trying to use it:

Error: AttributeError: type object 'Retry' has no attribute 'DEFAULT_ALLOWED_METHODS'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then we need to schedule version updates, as this code would have to be updated down the line anyway

respect_retry_after_header=True,
raise_on_status=False # return original response when retries have been exhausted
)
self.session = requests.Session()
self.session.mount('https://', TlsV1HttpAdapter())
self.session.mount('https://', TlsV1HttpAdapter(max_retries=retries))

parsed_url = urlparse(host)
scheme = parsed_url.scheme
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
'tabulate>=0.7.7',
'six>=1.10.0',
'configparser>=0.3.5;python_version < "3.6"',
'tenacity>=6.2.0'
],
entry_points='''
[console_scripts]
Expand Down
Loading