Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions core/google/api/core/future/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@

import abc
import concurrent.futures
import functools
import operator

import six
import tenacity

from google.api.core import exceptions
from google.api.core import retry
from google.api.core.future import _helpers
from google.api.core.future import base


class _OperationNotComplete(Exception):
"""Private exception used for polling via retry."""
pass


class PollingFuture(base.Future):
"""A Future that needs to poll some service to check its status.

Expand Down Expand Up @@ -55,6 +57,11 @@ def done(self):
# pylint: disable=redundant-returns-doc, missing-raises-doc
raise NotImplementedError()

def _done_or_raise(self):
"""Check if the future is done and raise if it's not."""
if not self.done():
raise _OperationNotComplete()

def running(self):
"""True if the operation is currently running."""
return not self.done()
Expand All @@ -69,29 +76,16 @@ def _blocking_poll(self, timeout=None):
if self._result_set:
return

retry_on = tenacity.retry_if_result(
functools.partial(operator.is_not, True))
# Use exponential backoff with jitter.
wait_on = (
tenacity.wait_exponential(multiplier=1, max=10) +
tenacity.wait_random(0, 1))

if timeout is None:
retry = tenacity.retry(retry=retry_on, wait=wait_on)
else:
retry = tenacity.retry(
retry=retry_on,
wait=wait_on,
stop=tenacity.stop_after_delay(timeout))
retry_ = retry.Retry(
predicate=retry.if_exception_type(_OperationNotComplete),
deadline=timeout)

try:
retry(self.done)()
except tenacity.RetryError as exc:
six.raise_from(
concurrent.futures.TimeoutError(
'Operation did not complete within the designated '
'timeout.'),
exc)
retry_(self._done_or_raise)()
except exceptions.RetryError:
raise concurrent.futures.TimeoutError(
'Operation did not complete within the designated '
'timeout.')

def result(self, timeout=None):
"""Get the result of the operation, blocking if necessary.
Expand Down
1 change: 0 additions & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
'requests >= 2.18.0, < 3.0.0dev',
'setuptools >= 34.0.0',
'six',
'tenacity >= 4.0.0, <5.0.0dev'
]

EXTRAS_REQUIREMENTS = {
Expand Down