-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement consistent retry policy when calling Elasticsearch #2063
Conversation
connectors/es/client.py
Outdated
@@ -41,6 +46,10 @@ def __init__(self, config): | |||
use_default_ports_for_scheme=True, | |||
) | |||
self._sleeps = CancellableSleeps() | |||
self._retrier = TransientElasticsearchRetrier( | |||
logger, config.get("max_retries", 5), config.get("retry_timeout", 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use constants like DEFAULT_MAX_RETRIES
instead of the raw number. It's just easier to find and change them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added constants in a4d4f8b
Reposting my comment for the record (repeating your ideas :) ):
|
@@ -162,6 +165,81 @@ async def wait(self): | |||
await self.close() | |||
return False | |||
|
|||
async def ping(self): | |||
try: | |||
await self.client.info() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ping
is not retried?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not retried cause of wait
method - wait
calls ping
and does retrying based on its own config options. See method:
async def wait(self):
backoff = self.initial_backoff_duration
# ...
while time.time() - start < self.max_wait_duration:
if not self._keep_waiting:
await self.close()
return False
# ...
if await self.ping():
return True
await self._sleeps.sleep(backoff)
backoff *= self.backoff_multiplier
await self.close()
return False
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed in slack, I think we should combine wait
and ping
, but it's totally fine to address it in a separate PR.
@seanstory @vidok @wangch079 I've addressed all the points that I've seen in your comments, are there any other things that should be addressed before this PR can be approved? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic work 🚀 !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, but it's totally fine to merge it now.
d0b1fdd
💔 Failed to create backport PR(s)The backport operation could not be completed due to the following error: The backport PRs will be merged automatically after passing CI. To backport manually run: |
Closes https://github.com/elastic/enterprise-search-team/issues/6412
This PR makes our retry policy when calling Elasticsearch consistent between all requests to Elasticsearch.
It's possible to specify the following settings in a config file:
The above setup states, that we'll retry every call (wrapped with retryable abstraction) 5 times, starting from 10 seconds on first retry. Internally, retry policy is Linear, so first retry will be 10 seconds, second retry will be 20 seconds, and so on.
For now absolutely every call to Elasticsearch will be retried like this (I wrapped all calls with the retrier).
Errors that are retried are:
To do the work I've introduced a
TransientElasticsearchRetrier
class that handles the logic.Retry method signature is:
To retry something, you need to do the following:
In case the ES call requires arguments,
partial
should be used:Separate class was created because:
@retryable
wrapper are not cancellable, so stopping a running job that's inside retry loop will generate an errorChecklists
Pre-Review Checklist
v7.13.2
,v7.14.0
,v8.0.0
)Release Note
Improve connection resiliency when accessing elasticsearch - retry all failed calls that fail because of unhealthy Elasticsearch before terminating with an error. This logic is configurable with newly added
elasticsearch.max_retries
andelasticsearch.retry_interval
configuration fields.elasticsearch.bulk.max_retries
was increased from 3 to 5 to match newly addedelasticsearch.max_retries
default value. Additionallyelasticsearch.bulk.retry_interval
was added with default value of 10 seconds (previously was hardcoded to 1 second) to match behaviour of newly addedelasticsearch.retry_interval
. As a result, bulk inserts will retry for 10 * (1 + 2 + 3 + 4 + 5) = 150 seconds, before this number was 1 * (1 + 2 + 3) = 6 seconds.