From a64cd078f961e7b0d663d328b6b37099ef09517b Mon Sep 17 00:00:00 2001 From: Aaron Hoffer <4275843+ayayron@users.noreply.github.com> Date: Tue, 12 Nov 2024 04:01:05 -0800 Subject: [PATCH] Allow retries for statuses other than 429 in streaming bulk (#2071) Co-authored-by: Miguel Grinberg Co-authored-by: Quentin Pradet (cherry picked from commit 51aaccedd660ba7a008facd303f8bfe909427273) --- elasticsearch/_async/helpers.py | 29 +++++++++----- elasticsearch/helpers/actions.py | 29 +++++++++----- .../test_async/test_server/test_helpers.py | 39 +++++++++++++++++++ .../test_server/test_helpers.py | 39 +++++++++++++++++++ 4 files changed, 116 insertions(+), 20 deletions(-) diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index 1ab55850b..1bc339917 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -173,6 +173,7 @@ async def async_streaming_bulk( max_backoff: float = 600, yield_ok: bool = True, ignore_status: Union[int, Collection[int]] = (), + retry_on_status: Union[int, Collection[int]] = (429,), *args: Any, **kwargs: Any, ) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]: @@ -184,10 +185,11 @@ async def async_streaming_bulk( entire input is consumed and sent. If you specify ``max_retries`` it will also retry any documents that were - rejected with a ``429`` status code. To do this it will wait (**by calling - asyncio.sleep**) for ``initial_backoff`` seconds and then, - every subsequent rejection for the same chunk, for double the time every - time up to ``max_backoff`` seconds. + rejected with a ``429`` status code. Use ``retry_on_status`` to + configure which status codes will be retried. To do this it will wait + (**by calling asyncio.sleep which will block**) for ``initial_backoff`` seconds + and then, every subsequent rejection for the same chunk, for double the time + every time up to ``max_backoff`` seconds. :arg client: instance of :class:`~elasticsearch.AsyncElasticsearch` to use :arg actions: iterable or async iterable containing the actions to be executed @@ -200,8 +202,11 @@ async def async_streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). + :arg retry_on_status: HTTP status code that will trigger a retry. + (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when - ``429`` is received, set to 0 (default) for no retries on ``429`` + retry_on_status (defaulting to ``429``) is received, + set to 0 (default) for no retries :arg initial_backoff: number of seconds we should wait before the first retry. Any subsequent retries will be powers of ``initial_backoff * 2**retry_number`` @@ -213,6 +218,9 @@ async def async_streaming_bulk( client = client.options() client._client_meta = (("h", "bp"),) + if isinstance(retry_on_status, int): + retry_on_status = (retry_on_status,) + async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: async for item in aiter(actions): yield expand_action_callback(item) @@ -264,11 +272,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: ): if not ok: action, info = info.popitem() - # retry if retries enabled, we get 429, and we are not - # in the last attempt + # retry if retries enabled, we are not in the last attempt, + # and status in retry_on_status (defaulting to 429) if ( max_retries - and info["status"] == 429 + and info["status"] in retry_on_status and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects strings so we need to @@ -281,8 +289,9 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: yield ok, info except ApiError as e: - # suppress 429 errors since we will retry them - if attempt == max_retries or e.status_code != 429: + # suppress any status in retry_on_status (429 by default) + # since we will retry them + if attempt == max_retries or e.status_code not in retry_on_status: raise else: if not to_retry: diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 1d6b0a27e..687bf4b84 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -374,6 +374,7 @@ def streaming_bulk( max_backoff: float = 600, yield_ok: bool = True, ignore_status: Union[int, Collection[int]] = (), + retry_on_status: Union[int, Collection[int]] = (429,), span_name: str = "helpers.streaming_bulk", *args: Any, **kwargs: Any, @@ -386,10 +387,11 @@ def streaming_bulk( entire input is consumed and sent. If you specify ``max_retries`` it will also retry any documents that were - rejected with a ``429`` status code. To do this it will wait (**by calling - time.sleep which will block**) for ``initial_backoff`` seconds and then, - every subsequent rejection for the same chunk, for double the time every - time up to ``max_backoff`` seconds. + rejected with a ``429`` status code. Use ``retry_on_status`` to + configure which status codes will be retried. To do this it will wait + (**by calling time.sleep which will block**) for ``initial_backoff`` seconds + and then, every subsequent rejection for the same chunk, for double the time + every time up to ``max_backoff`` seconds. :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use :arg actions: iterable containing the actions to be executed @@ -402,8 +404,11 @@ def streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). + :arg retry_on_status: HTTP status code that will trigger a retry. + (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when - ``429`` is received, set to 0 (default) for no retries on ``429`` + retry_on_status (defaulting to ``429``) is received, + set to 0 (default) for no retries :arg initial_backoff: number of seconds we should wait before the first retry. Any subsequent retries will be powers of ``initial_backoff * 2**retry_number`` @@ -415,6 +420,9 @@ def streaming_bulk( client = client.options() client._client_meta = (("h", "bp"),) + if isinstance(retry_on_status, int): + retry_on_status = (retry_on_status,) + serializer = client.transport.serializers.get_serializer("application/json") bulk_data: List[ @@ -458,11 +466,11 @@ def streaming_bulk( ): if not ok: action, info = info.popitem() - # retry if retries enabled, we get 429, and we are not - # in the last attempt + # retry if retries enabled, we are not in the last attempt, + # and status in retry_on_status (defaulting to 429) if ( max_retries - and info["status"] == 429 + and info["status"] in retry_on_status and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects bytes so we need to @@ -475,8 +483,9 @@ def streaming_bulk( yield ok, info except ApiError as e: - # suppress 429 errors since we will retry them - if attempt == max_retries or e.status_code != 429: + # suppress any status in retry_on_status (429 by default) + # since we will retry them + if attempt == max_retries or e.status_code not in retry_on_status: raise else: if not to_retry: diff --git a/test_elasticsearch/test_async/test_server/test_helpers.py b/test_elasticsearch/test_async/test_server/test_helpers.py index 746dc1028..0bb781304 100644 --- a/test_elasticsearch/test_async/test_server/test_helpers.py +++ b/test_elasticsearch/test_async/test_server/test_helpers.py @@ -293,6 +293,45 @@ async def streaming_bulk(): await streaming_bulk() assert 4 == failing_client._called + async def test_connection_timeout_is_retried_with_retry_status_callback( + self, async_client + ): + failing_client = FailingBulkClient( + async_client, + fail_with=ApiError( + message="Connection timed out!", + body={}, + meta=ApiResponseMeta( + status=522, headers={}, http_version="1.1", duration=0, node=None + ), + ), + ) + docs = [ + {"_index": "i", "_id": 47, "f": "v"}, + {"_index": "i", "_id": 45, "f": "v"}, + {"_index": "i", "_id": 42, "f": "v"}, + ] + + results = [ + x + async for x in helpers.async_streaming_bulk( + failing_client, + docs, + raise_on_exception=False, + raise_on_error=False, + chunk_size=1, + retry_on_status=522, + max_retries=1, + initial_backoff=0, + ) + ] + assert 3 == len(results) + assert [True, True, True] == [r[0] for r in results] + await async_client.indices.refresh(index="i") + res = await async_client.search(index="i") + assert {"value": 3, "relation": "eq"} == res["hits"]["total"] + assert 4 == failing_client._called + class TestBulk: async def test_bulk_works_with_single_item(self, async_client): diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index 011803bc9..6ed43e2af 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -288,6 +288,45 @@ def streaming_bulk(): assert 4 == failing_client._called +def test_connection_timeout_is_retried_with_retry_status_callback(sync_client): + failing_client = FailingBulkClient( + sync_client, + fail_with=ApiError( + message="Connection timed out!", + body={}, + meta=ApiResponseMeta( + status=522, headers={}, http_version="1.1", duration=0, node=None + ), + ), + ) + docs = [ + {"_index": "i", "_id": 47, "f": "v"}, + {"_index": "i", "_id": 45, "f": "v"}, + {"_index": "i", "_id": 42, "f": "v"}, + ] + + results = list( + helpers.streaming_bulk( + failing_client, + docs, + index="i", + raise_on_exception=False, + raise_on_error=False, + chunk_size=1, + retry_on_status=522, + max_retries=1, + initial_backoff=0, + ) + ) + assert 3 == len(results) + print(results) + assert [True, True, True] == [r[0] for r in results] + sync_client.indices.refresh(index="i") + res = sync_client.search(index="i") + assert {"value": 3, "relation": "eq"} == res["hits"]["total"] + assert 4 == failing_client._called + + def test_bulk_works_with_single_item(sync_client): docs = [{"answer": 42, "_id": 1}] success, failed = helpers.bulk(sync_client, docs, index="test-index", refresh=True)