Skip to content

Commit f6e3a66

Browse files
artem-shelkovnikovelastic
authored and
elastic
committed
Add retry to ESIndex class methods (#2171)
1 parent e9a5060 commit f6e3a66

File tree

2 files changed

+53
-29
lines changed

2 files changed

+53
-29
lines changed

connectors/es/index.py

+49-27
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# or more contributor license agreements. Licensed under the Elastic License 2.0;
44
# you may not use this file except in compliance with the Elastic License 2.0.
55
#
6+
from functools import partial
7+
68
from elasticsearch import ApiError
79

810
from connectors.es import ESClient
@@ -50,10 +52,14 @@ async def fetch_by_id(self, doc_id):
5052

5153
async def fetch_response_by_id(self, doc_id):
5254
if not self.serverless:
53-
await self.client.indices.refresh(index=self.index_name)
55+
await self._retrier.execute_with_retry(
56+
partial(self.client.indices.refresh, index=self.index_name)
57+
)
5458

5559
try:
56-
resp = await self.client.get(index=self.index_name, id=doc_id)
60+
resp = await self._retrier.execute_with_retry(
61+
partial(self.client.get, index=self.index_name, id=doc_id)
62+
)
5763
except ApiError as e:
5864
logger.critical(f"The server returned {e.status_code}")
5965
logger.critical(e.body, exc_info=True)
@@ -65,30 +71,41 @@ async def fetch_response_by_id(self, doc_id):
6571
return resp.body
6672

6773
async def index(self, doc):
68-
return await self.client.index(index=self.index_name, document=doc)
74+
return await self._retrier.execute_with_retry(
75+
partial(self.client.index, index=self.index_name, document=doc)
76+
)
6977

7078
async def clean_index(self):
71-
return await self.client.delete_by_query(
72-
index=self.index_name,
73-
body={"query": {"match_all": {}}},
74-
ignore_unavailable=True,
75-
conflicts="proceed",
79+
return await self._retrier.execute_with_retry(
80+
partial(
81+
self.client.delete_by_query,
82+
index=self.index_name,
83+
body={"query": {"match_all": {}}},
84+
ignore_unavailable=True,
85+
conflicts="proceed",
86+
)
7687
)
7788

7889
async def update(self, doc_id, doc, if_seq_no=None, if_primary_term=None):
79-
return await self.client.update(
80-
index=self.index_name,
81-
id=doc_id,
82-
doc=doc,
83-
if_seq_no=if_seq_no,
84-
if_primary_term=if_primary_term,
90+
return await self._retrier.execute_with_retry(
91+
partial(
92+
self.client.update,
93+
index=self.index_name,
94+
id=doc_id,
95+
doc=doc,
96+
if_seq_no=if_seq_no,
97+
if_primary_term=if_primary_term,
98+
)
8599
)
86100

87101
async def update_by_script(self, doc_id, script):
88-
return await self.client.update(
89-
index=self.index_name,
90-
id=doc_id,
91-
script=script,
102+
return await self._retrier.execute_with_retry(
103+
partial(
104+
self.client.update,
105+
index=self.index_name,
106+
id=doc_id,
107+
script=script,
108+
)
92109
)
93110

94111
async def get_all_docs(self, query=None, sort=None, page_size=DEFAULT_PAGE_SIZE):
@@ -103,7 +120,9 @@ async def get_all_docs(self, query=None, sort=None, page_size=DEFAULT_PAGE_SIZE)
103120
Iterator
104121
"""
105122
if not self.serverless:
106-
await self.client.indices.refresh(index=self.index_name)
123+
await self._retrier.execute_with_retry(
124+
partial(self.client.indices.refresh, index=self.index_name)
125+
)
107126

108127
if query is None:
109128
query = {"match_all": {}}
@@ -113,14 +132,17 @@ async def get_all_docs(self, query=None, sort=None, page_size=DEFAULT_PAGE_SIZE)
113132

114133
while True:
115134
try:
116-
resp = await self.client.search(
117-
index=self.index_name,
118-
query=query,
119-
sort=sort,
120-
from_=offset,
121-
size=page_size,
122-
expand_wildcards="hidden",
123-
seq_no_primary_term=True,
135+
resp = await self._retrier.execute_with_retry(
136+
partial(
137+
self.client.search,
138+
index=self.index_name,
139+
query=query,
140+
sort=sort,
141+
from_=offset,
142+
size=page_size,
143+
expand_wildcards="hidden",
144+
seq_no_primary_term=True,
145+
)
124146
)
125147
except ApiError as e:
126148
logger.error(

tests/es/test_index.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ async def test_fetch_response_by_id_not_found(mock_responses):
118118

119119

120120
@pytest.mark.asyncio
121-
async def test_fetch_response_by_id_api_error(mock_responses):
121+
async def test_fetch_response_by_id_api_error(mock_responses, patch_sleep):
122122
doc_id = "1"
123123
index = FakeIndex(index_name, config)
124124
mock_responses.post(
@@ -129,6 +129,7 @@ async def test_fetch_response_by_id_api_error(mock_responses):
129129
f"http://nowhere.com:9200/{index_name}/_doc/{doc_id}",
130130
headers=headers,
131131
status=500,
132+
repeat=True,
132133
)
133134

134135
with pytest.raises(ApiError):
@@ -201,7 +202,7 @@ async def test_update_by_script():
201202

202203

203204
@pytest.mark.asyncio
204-
async def test_get_all_docs_with_error(mock_responses, patch_logger):
205+
async def test_get_all_docs_with_error(mock_responses, patch_logger, patch_sleep):
205206
index = FakeIndex(index_name, config)
206207
mock_responses.post(
207208
f"http://nowhere.com:9200/{index_name}/_refresh", headers=headers, status=200
@@ -211,6 +212,7 @@ async def test_get_all_docs_with_error(mock_responses, patch_logger):
211212
f"http://nowhere.com:9200/{index_name}/_search?expand_wildcards=hidden",
212213
headers=headers,
213214
status=500,
215+
repeat=True,
214216
)
215217

216218
with pytest.raises(ApiError) as e:

0 commit comments

Comments
 (0)