Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement consistent retry policy when calling Elasticsearch #2063

Merged
merged 32 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1915aee
WIP for connectors retry
artem-shelkovnikov Jan 15, 2024
4679b8b
Force ftest for all connectors
artem-shelkovnikov Jan 15, 2024
88d2362
Decrease ES size for all ftests
artem-shelkovnikov Jan 15, 2024
aa60d9d
Fix linters + small bug
artem-shelkovnikov Jan 15, 2024
2a23340
Add retry to one more place
artem-shelkovnikov Jan 15, 2024
af94a15
Remove retryable
artem-shelkovnikov Jan 15, 2024
7b7482b
Make logs a bit easier to read
artem-shelkovnikov Jan 15, 2024
989ffdc
Fix some tests, improve logging, make it work even better
artem-shelkovnikov Jan 16, 2024
edc9518
Actually fix linting + small refactoring
artem-shelkovnikov Jan 16, 2024
65bcf56
Fix linting
artem-shelkovnikov Jan 16, 2024
658ba84
Return instance sizes
artem-shelkovnikov Jan 16, 2024
fbfdd40
Add tests for retrier + rename it
artem-shelkovnikov Jan 17, 2024
6f67dbe
Properly read bulk config
artem-shelkovnikov Jan 18, 2024
d009e43
Remove defaults
artem-shelkovnikov Jan 18, 2024
9910c4c
Some cleanups
artem-shelkovnikov Jan 18, 2024
4d3d441
Remove change in Dockerfile.ftest
artem-shelkovnikov Jan 18, 2024
50ba7e3
Fix missing comma
artem-shelkovnikov Jan 18, 2024
f9fbe16
Return default
artem-shelkovnikov Jan 18, 2024
edc1e2a
Merge branch 'main' into artem/actually-retry-es-calls
artem-shelkovnikov Jan 22, 2024
3a111f1
Retry more error codes
artem-shelkovnikov Jan 24, 2024
45a3b5e
Rename retry_timeout -> retry_interval
artem-shelkovnikov Jan 24, 2024
a4d4f8b
Add constants + deprecation message
artem-shelkovnikov Jan 24, 2024
8e431d5
Autoformat
artem-shelkovnikov Jan 24, 2024
c405831
Merge branch 'main' into artem/actually-retry-es-calls
artem-shelkovnikov Jan 25, 2024
4be0bd8
Return elasticsearch.bulk.max_retries back
artem-shelkovnikov Jan 26, 2024
d0b1fdd
Remove deprecation notices for now
artem-shelkovnikov Jan 26, 2024
fe133e8
Improve bulk insert retryable to make it work same like in other retries
artem-shelkovnikov Jan 26, 2024
e4cbd88
Add config options into yml file
artem-shelkovnikov Jan 26, 2024
4c12c18
Merge branch 'main' into artem/actually-retry-es-calls
artem-shelkovnikov Jan 26, 2024
244ce2e
Address comments
artem-shelkovnikov Jan 29, 2024
faf1479
Fix linters
artem-shelkovnikov Jan 29, 2024
2d64473
Merge branch 'main' into artem/actually-retry-es-calls
artem-shelkovnikov Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion connectors/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ def _default_config():
"max_concurrency": 5,
"chunk_max_mem_size": 5,
"concurrent_downloads": 10,
"max_retries": 3,
},
"max_retries": 5,
"retry_timeout": 10,
"retry_on_timeout": True,
"request_timeout": 120,
"max_wait_duration": 120,
Expand Down
105 changes: 88 additions & 17 deletions connectors/es/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
from enum import Enum

from elastic_transport import ConnectionTimeout
from elastic_transport.client_utils import url_to_node_config
from elasticsearch import ApiError, AsyncElasticsearch, ConflictError
from elasticsearch import (
Expand All @@ -17,7 +18,11 @@

from connectors import __version__
from connectors.logger import logger, set_extra_logger
from connectors.utils import CancellableSleeps
from connectors.utils import (
CancellableSleeps,
RetryStrategy,
time_to_sleep_between_retries,
)


class License(Enum):
Expand All @@ -41,6 +46,10 @@ def __init__(self, config):
use_default_ports_for_scheme=True,
)
self._sleeps = CancellableSleeps()
self._retrier = TransientElasticsearchRetrier(
logger, config.get("max_retries", 5), config.get("retry_timeout", 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use constants like DEFAULT_MAX_RETRIES instead of the raw number. It's just easier to find and change them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added constants in a4d4f8b

)

options = {
"hosts": [self.host],
"request_timeout": config.get("request_timeout", 120),
Expand Down Expand Up @@ -100,7 +109,9 @@ async def has_active_license_enabled(self, license_):
Tuple: (boolean if `license_` is enabled and not expired, actual license Elasticsearch is using)
"""

license_response = await self.client.license.get()
license_response = await self._retrier.execute_with_retry(
self.client.license.get
)
license_info = license_response.get("license", {})
is_expired = license_info.get("status", "").lower() == "expired"

Expand All @@ -125,23 +136,9 @@ async def has_active_license_enabled(self, license_):
)

async def close(self):
await self._retrier.close()
await self.client.close()

async def ping(self):
try:
await self.client.info()
except ApiError as e:
logger.error(f"The server returned a {e.status_code} code")
if e.info is not None and "error" in e.info and "reason" in e.info["error"]:
logger.error(e.info["error"]["reason"])
return False
except ElasticConnectionError as e:
logger.error("Could not connect to the server")
if e.message is not None:
logger.error(e.message)
return False
return True

async def wait(self):
backoff = self.initial_backoff_duration
start = time.time()
Expand All @@ -162,6 +159,80 @@ async def wait(self):
await self.close()
return False

async def ping(self):
try:
await self.client.info()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ping is not retried?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not retried cause of wait method - wait calls ping and does retrying based on its own config options. See method:

    async def wait(self):
        backoff = self.initial_backoff_duration
        # ...
        while time.time() - start < self.max_wait_duration:
            if not self._keep_waiting:
                await self.close()
                return False

            # ...
            if await self.ping():
                return True
            await self._sleeps.sleep(backoff)
            backoff *= self.backoff_multiplier

        await self.close()
        return False

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in slack, I think we should combine wait and ping, but it's totally fine to address it in a separate PR.

except ApiError as e:
logger.error(f"The server returned a {e.status_code} code")
if e.info is not None and "error" in e.info and "reason" in e.info["error"]:
logger.error(e.info["error"]["reason"])
return False
except ElasticConnectionError as e:
logger.error("Could not connect to the server")
if e.message is not None:
logger.error(e.message)
return False
return True


class RetryInterruptedError(Exception):
pass


class TransientElasticsearchRetrier:
def __init__(
self,
logger_,
max_retries,
retry_timeout,
retry_strategy=RetryStrategy.LINEAR_BACKOFF,
):
self._logger = logger_
self._sleeps = CancellableSleeps()
self._keep_retrying = True
self._max_retries = max_retries
self._retry_timeout = retry_timeout
self._retry_strategy = retry_strategy

async def close(self):
self._sleeps.cancel()
self._keep_retrying = False

async def _sleep(self, retry):
time_to_sleep = time_to_sleep_between_retries(
self._retry_strategy, self._retry_timeout, retry
)
self._logger.debug(f"Attempt {retry}: sleeping for {time_to_sleep}")
await self._sleeps.sleep(time_to_sleep)

async def execute_with_retry(self, func):
retry = 0
while self._keep_retrying and retry < self._max_retries:
retry += 1
try:
result = await func()

return result
except ConnectionTimeout:
self._logger.debug(f"Attempt {retry}: connection timeout")

if retry >= self._max_retries:
raise
except ApiError as e:
self._logger.debug(
f"Attempt {retry}: api error with status {e.status_code}"
)

if e.status_code != 429:
raise
if retry >= self._max_retries:
raise

await self._sleep(retry)

msg = "Retry operation was interrupted"
raise RetryInterruptedError(msg)


def with_concurrency_control(retries=3):
def wrapper(func):
Expand Down
94 changes: 69 additions & 25 deletions connectors/es/management_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# you may not use this file except in compliance with the Elastic License 2.0.
#

from functools import partial

from elasticsearch import (
NotFoundError as ElasticNotFoundError,
)
Expand Down Expand Up @@ -39,34 +41,48 @@ async def ensure_exists(self, indices=None):

for index in indices:
logger.debug(f"Checking index {index}")
if not await self.client.indices.exists(index=index):
await self.client.indices.create(index=index)
if not await self._retrier.execute_with_retry(
partial(self.client.indices.exists, index=index)
):
await self._retrier.execute_with_retry(
partial(self.client.indices.create, index=index)
)
logger.debug(f"Created index {index}")

async def create_content_index(self, search_index_name, language_code):
settings = Settings(language_code=language_code, analysis_icu=False).to_hash()
mappings = Mappings.default_text_fields_mappings(is_connectors_index=True)

return await self.client.indices.create(
index=search_index_name, mappings=mappings, settings=settings
return await self._retrier.execute_with_retry(
partial(
self.client.indices.create,
index=search_index_name,
mappings=mappings,
settings=settings,
)
)

async def ensure_content_index_mappings(self, index, mappings):
# open = Match open, non-hidden indices. Also matches any non-hidden data stream.
# Content indices are always non-hidden.
response = await self.client.indices.get_mapping(index=index)
response = await self._retrier.execute_with_retry(
partial(self.client.indices.get_mapping, index=index)
)

existing_mappings = response[index].get("mappings", {})
if len(existing_mappings) == 0:
if mappings:
logger.debug(
"Index %s has no mappings or it's empty. Adding mappings...", index
)
await self.client.indices.put_mapping(
index=index,
dynamic=mappings.get("dynamic", False),
dynamic_templates=mappings.get("dynamic_templates", []),
properties=mappings.get("properties", {}),
await self._retrier.execute_with_retry(
partial(
self.client.indices.put_mapping,
index=index,
dynamic=mappings.get("dynamic", False),
dynamic_templates=mappings.get("dynamic_templates", []),
properties=mappings.get("properties", {}),
)
)
logger.debug("Successfully added mappings for index %s", index)
else:
Expand All @@ -82,34 +98,62 @@ async def ensure_ingest_pipeline_exists(
self, pipeline_id, version, description, processors
):
try:
await self.client.ingest.get_pipeline(id=pipeline_id)
await self._retrier.execute_with_retry(
partial(self.client.ingest.get_pipeline, id=pipeline_id)
)
except ElasticNotFoundError:
await self.client.ingest.put_pipeline(
id=pipeline_id,
version=version,
description=description,
processors=processors,
await self._retrier.execute_with_retry(
partial(
self.client.ingest.put_pipeline,
id=pipeline_id,
version=version,
description=description,
processors=processors,
)
)

async def delete_indices(self, indices):
await self.client.indices.delete(index=indices, ignore_unavailable=True)
await self._retrier.execute_with_retry(
partial(self.client.indices.delete, index=indices, ignore_unavailable=True)
)

async def clean_index(self, index_name):
return await self.client.delete_by_query(
index=index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True
return await self._retrier.execute_with_retry(
partial(
self.client.delete_by_query,
index=index_name,
body={"query": {"match_all": {}}},
ignore_unavailable=True,
)
)

async def list_indices(self):
return await self.client.indices.stats(index="search-*")
return await self._retrier.execute_with_retry(
partial(self.client.indices.stats, index="search-*")
)

async def index_exists(self, index_name):
return await self.client.indices.exists(index=index_name)
return await self._retrier.execute_with_retry(
partial(self.client.indices.exists, index=index_name)
)

async def upsert(self, _id, index_name, doc):
await self.client.index(
id=_id,
index=index_name,
document=doc,
return await self._retrier.execute_with_retry(
partial(
self.client.index,
id=_id,
index=index_name,
document=doc,
)
)

async def bulk_insert(self, operations, pipeline):
return await self._retrier.execute_with_retry(
partial(
self.client.bulk,
operations=operations,
pipeline=pipeline,
)
)

async def yield_existing_documents_metadata(self, index):
Expand Down
11 changes: 3 additions & 8 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
aenumerate,
get_size,
iso_utc,
retryable,
)

__all__ = ["SyncOrchestrator"]
Expand Down Expand Up @@ -130,20 +129,16 @@ def _bulk_op(self, doc, operation=OP_INDEX):

@tracer.start_as_current_span("_bulk API call", slow_log=1.0)
async def _batch_bulk(self, operations, stats):
@retryable(retries=self.max_retires)
async def _bulk_api_call():
return await self.client.client.bulk(
operations=operations, pipeline=self.pipeline["name"]
)

# TODO: treat result to retry errors like in async_streaming_bulk
task_num = len(self.bulk_tasks)

if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(
f"Task {task_num} - Sending a batch of {len(operations)} ops -- {get_mb_size(operations)}MiB"
)
res = await _bulk_api_call()

# TODO: retry 429s for individual items here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please expand more on this comment? I don't fully understand how can we handle individual items errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now we have 500 items that we send, then if 1 fails we retry the whole batch.

What I wanna do in later PRs:

  • See if error is retriable. If so, then put it back to the queue to re-ingest in a different batch
  • If error is not retriable, raise following the resilience protocol we have (not raise if number of errors during ingestion does not exceed certain %, so if 0.5% of items are not ingested we don't raise, but if we're at 1% then we raise)
  • Maybe have some sort of "dead letter queue" for these items? (that could happen cause of mappings blowing up with too many fields generated and such). This I probably won't do without extensive discussion and seeing if we even want that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have some sort of "dead letter queue" for these items? (that could happen cause of mappings blowing up with too many fields generated and such). This I probably won't do without extensive discussion and seeing if we even want that.

I think is a good way to handle it but we can discuss it later.

Thanks for the details 👍🏻

res = await self.client.bulk_insert(operations, self.pipeline["name"])
if res.get("errors"):
for item in res["items"]:
for op, data in item.items():
Expand Down
Loading