|
16 | 16 |
|
17 | 17 | import abc |
18 | 18 | import concurrent.futures |
19 | | -import functools |
20 | | -import operator |
21 | | - |
22 | | -import six |
23 | | -import tenacity |
24 | 19 |
|
| 20 | +from google.api.core import exceptions |
| 21 | +from google.api.core import retry |
25 | 22 | from google.api.core.future import _helpers |
26 | 23 | from google.api.core.future import base |
27 | 24 |
|
28 | 25 |
|
| 26 | +class _OperationNotComplete(Exception): |
| 27 | + """Private exception used for polling via retry.""" |
| 28 | + pass |
| 29 | + |
| 30 | + |
29 | 31 | class PollingFuture(base.Future): |
30 | 32 | """A Future that needs to poll some service to check its status. |
31 | 33 |
|
@@ -69,29 +71,21 @@ def _blocking_poll(self, timeout=None): |
69 | 71 | if self._result_set: |
70 | 72 | return |
71 | 73 |
|
72 | | - retry_on = tenacity.retry_if_result( |
73 | | - functools.partial(operator.is_not, True)) |
74 | | - # Use exponential backoff with jitter. |
75 | | - wait_on = ( |
76 | | - tenacity.wait_exponential(multiplier=1, max=10) + |
77 | | - tenacity.wait_random(0, 1)) |
78 | | - |
79 | | - if timeout is None: |
80 | | - retry = tenacity.retry(retry=retry_on, wait=wait_on) |
81 | | - else: |
82 | | - retry = tenacity.retry( |
83 | | - retry=retry_on, |
84 | | - wait=wait_on, |
85 | | - stop=tenacity.stop_after_delay(timeout)) |
| 74 | + def done_or_raise(): |
| 75 | + """Checks if the future is done and raises if it's not.""" |
| 76 | + if not self.done(): |
| 77 | + raise _OperationNotComplete() |
| 78 | + |
| 79 | + retry_ = retry.Retry( |
| 80 | + predicate=retry.if_exception_type(_OperationNotComplete), |
| 81 | + deadline=timeout) |
86 | 82 |
|
87 | 83 | try: |
88 | | - retry(self.done)() |
89 | | - except tenacity.RetryError as exc: |
90 | | - six.raise_from( |
91 | | - concurrent.futures.TimeoutError( |
92 | | - 'Operation did not complete within the designated ' |
93 | | - 'timeout.'), |
94 | | - exc) |
| 84 | + retry_(done_or_raise)() |
| 85 | + except exceptions.RetryError: |
| 86 | + raise concurrent.futures.TimeoutError( |
| 87 | + 'Operation did not complete within the designated ' |
| 88 | + 'timeout.') |
95 | 89 |
|
96 | 90 | def result(self, timeout=None): |
97 | 91 | """Get the result of the operation, blocking if necessary. |
|
0 commit comments