-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
replace naive retry with tenacity #3026
Conversation
@hirosassa I merged #3022 so I guess you would like to rebase. Is this PR ready for review? |
luigi/rpc.py
Outdated
import base64 | ||
|
||
from urllib.parse import urljoin, urlencode, urlparse | ||
from urllib.request import urlopen, Request | ||
from urllib.error import URLError | ||
|
||
from tenacity import retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can import all of them in one line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! fixed in bda80d5
if self._rpc_log_retries: | ||
logger.info("Wait for %d seconds" % self._rpc_retry_wait) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to keep this log. Never know what users would do with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added wait log and fixed test in 8f36797
luigi/rpc.py
Outdated
@@ -144,35 +146,36 @@ def __init__(self, url='http://localhost:8082/', connect_timeout=None): | |||
else: | |||
self._fetcher = URLLibFetcher() | |||
|
|||
def _wait(self): | |||
def _get_retry_decorator(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be simplified as:
@retry(wait=wait_fixed(self._rpc_retry_wait),
stop=stop_after_attempt(self._rpc_retry_attempts),
reraise=True,
after=_retry_logging))
and we check the flag in _retry_logging
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good suggestion! Thank you. Fixed in a4d46a0
luigi/rpc.py
Outdated
scheduler_retry = self._get_retry_decorator() | ||
|
||
@scheduler_retry | ||
def __fetch(full_url, body): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid creating this inner function every time we can _fetch
? Maybe just make it a normal static method. Also __
as prefix doesn't seem to be needed: https://en.wikipedia.org/wiki/Name_mangling#Python
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@honnix I tried to fix this. Please check.
test/rpc_test.py
Outdated
@@ -82,9 +74,9 @@ def test_log_rpc_retries_enabled(self, mock_logger): | |||
self.get_work(fetch_results) | |||
self.assertEqual([ | |||
mock.call.warning('Failed connecting to remote scheduler %r', 'http://zorg.com', exc_info=True), | |||
mock.call.info('Retrying attempt 2 of 3 (max)'), | |||
mock.call.info('Retrying attempt 1 of 3 (max)'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What caused this different behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log shows the number of "retry" attempt, but in original one, it looks the log shows the number of total try (including 1st attempt, not "retry" attempt). So I fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. With this change, Retrying attempt n of 3 (max)
would be wrong because n
has already been tried. If we still prefer log being issued after an attempt, the original counting looks more accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@honnix Thanks for your comment! Make sense!
I fixed this. Please check.
test/worker_test.py
Outdated
except RPCError: | ||
self.assertEqual(self.waits, 2) # should attempt to add it 3 times | ||
except RPCError as e: | ||
self.assertTrue(e.args[0].find("Errors (3 attempts)") != -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
str(e)
can give you the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! fixed in 715ed32
@honnix Thank you for your review. I'll rebase and fix it! |
Thanks for making all the changes and improvement. Could you please rebase from main branch? |
@honnix Done! |
This has brought in all commits in main branch, could you please rebase instead? |
27fcb45
to
8f36797
Compare
8f36797
to
46a3a81
Compare
@honnix Sorry for my misunderstanding. Fixed |
format: review comment by honnix (thanks!) Co-authored-by: Honnix <honnix@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Looks great!
@Tarrasch @dlstadther Would you like to take a look at this as well? Thanks. |
scheduler_retry = self._get_retryer() | ||
|
||
try: | ||
response = scheduler_retry(self._fetcher.fetch, full_url, body, self._connect_timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for some reason i'm struggling to see where it's defined that full_uri
, body
, and self._connect_timeout
are being passed to self._fetcher.fetch()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comment!
Here is a definition of Retrying
, first argument of this class' __call__
is function, and followings are its arguments.
https://github.com/jd/tenacity/blob/3e2244535ccfbb6a4b7fdd77bfc9aa61a1302302/tenacity/__init__.py#L422-L442
So, the code above calls self._fetcher.fetch(full_url, body, self._connect_timeout)
with retrying feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, i see. A bit obfuscated, but it is being used as tenacity expects
Thanks! |
Thanks @hirosassa for persisting with this tangent that I set you off on, and for being so responsive to all suggestions. 👍 |
Thank you for your great suggestions, too @lallea ! |
Description
Related to #3025
I implemented retry procedure using tenacity library.
https://github.com/jd/tenacity
Motivation and Context
Related to #3022 , there are some naive implementations of retries in luigi.
To simplify such codes, I introduce tenacity library and implement retries using function decorator.
Have you tested this? If so, how?
I fixed and ran tests locally