Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 8.x] Allow retries for statuses other than 429 in streaming bulk #2701

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ async def async_streaming_bulk(
max_backoff: float = 600,
yield_ok: bool = True,
ignore_status: Union[int, Collection[int]] = (),
retry_on_status: Union[int, Collection[int]] = (429,),
*args: Any,
**kwargs: Any,
) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]:
Expand All @@ -184,10 +185,11 @@ async def async_streaming_bulk(
entire input is consumed and sent.

If you specify ``max_retries`` it will also retry any documents that were
rejected with a ``429`` status code. To do this it will wait (**by calling
asyncio.sleep**) for ``initial_backoff`` seconds and then,
every subsequent rejection for the same chunk, for double the time every
time up to ``max_backoff`` seconds.
rejected with a ``429`` status code. Use ``retry_on_status`` to
configure which status codes will be retried. To do this it will wait
(**by calling asyncio.sleep which will block**) for ``initial_backoff`` seconds
and then, every subsequent rejection for the same chunk, for double the time
every time up to ``max_backoff`` seconds.

:arg client: instance of :class:`~elasticsearch.AsyncElasticsearch` to use
:arg actions: iterable or async iterable containing the actions to be executed
Expand All @@ -200,8 +202,11 @@ async def async_streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_on_status: HTTP status code that will trigger a retry.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
retry_on_status (defaulting to ``429``) is received,
set to 0 (default) for no retries
:arg initial_backoff: number of seconds we should wait before the first
retry. Any subsequent retries will be powers of ``initial_backoff *
2**retry_number``
Expand All @@ -213,6 +218,9 @@ async def async_streaming_bulk(
client = client.options()
client._client_meta = (("h", "bp"),)

if isinstance(retry_on_status, int):
retry_on_status = (retry_on_status,)

async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
async for item in aiter(actions):
yield expand_action_callback(item)
Expand Down Expand Up @@ -264,11 +272,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
):
if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and status in retry_on_status (defaulting to 429)
if (
max_retries
and info["status"] == 429
and info["status"] in retry_on_status
and (attempt + 1) <= max_retries
):
# _process_bulk_chunk expects strings so we need to
Expand All @@ -281,8 +289,9 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
yield ok, info

except ApiError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress any status in retry_on_status (429 by default)
# since we will retry them
if attempt == max_retries or e.status_code not in retry_on_status:
raise
else:
if not to_retry:
Expand Down
29 changes: 19 additions & 10 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def streaming_bulk(
max_backoff: float = 600,
yield_ok: bool = True,
ignore_status: Union[int, Collection[int]] = (),
retry_on_status: Union[int, Collection[int]] = (429,),
span_name: str = "helpers.streaming_bulk",
*args: Any,
**kwargs: Any,
Expand All @@ -386,10 +387,11 @@ def streaming_bulk(
entire input is consumed and sent.

If you specify ``max_retries`` it will also retry any documents that were
rejected with a ``429`` status code. To do this it will wait (**by calling
time.sleep which will block**) for ``initial_backoff`` seconds and then,
every subsequent rejection for the same chunk, for double the time every
time up to ``max_backoff`` seconds.
rejected with a ``429`` status code. Use ``retry_on_status`` to
configure which status codes will be retried. To do this it will wait
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
and then, every subsequent rejection for the same chunk, for double the time
every time up to ``max_backoff`` seconds.

:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
:arg actions: iterable containing the actions to be executed
Expand All @@ -402,8 +404,11 @@ def streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_on_status: HTTP status code that will trigger a retry.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
retry_on_status (defaulting to ``429``) is received,
set to 0 (default) for no retries
:arg initial_backoff: number of seconds we should wait before the first
retry. Any subsequent retries will be powers of ``initial_backoff *
2**retry_number``
Expand All @@ -415,6 +420,9 @@ def streaming_bulk(
client = client.options()
client._client_meta = (("h", "bp"),)

if isinstance(retry_on_status, int):
retry_on_status = (retry_on_status,)

serializer = client.transport.serializers.get_serializer("application/json")

bulk_data: List[
Expand Down Expand Up @@ -458,11 +466,11 @@ def streaming_bulk(
):
if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and status in retry_on_status (defaulting to 429)
if (
max_retries
and info["status"] == 429
and info["status"] in retry_on_status
and (attempt + 1) <= max_retries
):
# _process_bulk_chunk expects bytes so we need to
Expand All @@ -475,8 +483,9 @@ def streaming_bulk(
yield ok, info

except ApiError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress any status in retry_on_status (429 by default)
# since we will retry them
if attempt == max_retries or e.status_code not in retry_on_status:
raise
else:
if not to_retry:
Expand Down
39 changes: 39 additions & 0 deletions test_elasticsearch/test_async/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,45 @@ async def streaming_bulk():
await streaming_bulk()
assert 4 == failing_client._called

async def test_connection_timeout_is_retried_with_retry_status_callback(
self, async_client
):
failing_client = FailingBulkClient(
async_client,
fail_with=ApiError(
message="Connection timed out!",
body={},
meta=ApiResponseMeta(
status=522, headers={}, http_version="1.1", duration=0, node=None
),
),
)
docs = [
{"_index": "i", "_id": 47, "f": "v"},
{"_index": "i", "_id": 45, "f": "v"},
{"_index": "i", "_id": 42, "f": "v"},
]

results = [
x
async for x in helpers.async_streaming_bulk(
failing_client,
docs,
raise_on_exception=False,
raise_on_error=False,
chunk_size=1,
retry_on_status=522,
max_retries=1,
initial_backoff=0,
)
]
assert 3 == len(results)
assert [True, True, True] == [r[0] for r in results]
await async_client.indices.refresh(index="i")
res = await async_client.search(index="i")
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
assert 4 == failing_client._called


class TestBulk:
async def test_bulk_works_with_single_item(self, async_client):
Expand Down
39 changes: 39 additions & 0 deletions test_elasticsearch/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,45 @@ def streaming_bulk():
assert 4 == failing_client._called


def test_connection_timeout_is_retried_with_retry_status_callback(sync_client):
failing_client = FailingBulkClient(
sync_client,
fail_with=ApiError(
message="Connection timed out!",
body={},
meta=ApiResponseMeta(
status=522, headers={}, http_version="1.1", duration=0, node=None
),
),
)
docs = [
{"_index": "i", "_id": 47, "f": "v"},
{"_index": "i", "_id": 45, "f": "v"},
{"_index": "i", "_id": 42, "f": "v"},
]

results = list(
helpers.streaming_bulk(
failing_client,
docs,
index="i",
raise_on_exception=False,
raise_on_error=False,
chunk_size=1,
retry_on_status=522,
max_retries=1,
initial_backoff=0,
)
)
assert 3 == len(results)
print(results)
assert [True, True, True] == [r[0] for r in results]
sync_client.indices.refresh(index="i")
res = sync_client.search(index="i")
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
assert 4 == failing_client._called


def test_bulk_works_with_single_item(sync_client):
docs = [{"answer": 42, "_id": 1}]
success, failed = helpers.bulk(sync_client, docs, index="test-index", refresh=True)
Expand Down
Loading