Skip to content

Commit

Permalink
Update force merge polling to use async
Browse files Browse the repository at this point in the history
Change force merge polling logic to use wait_for_completion
to false, to make it async and use task's get api to check
whether task is completed or not to exit from force merge.

Here, request end time is calcuated only after task is completed.
This request end time will not be 100% accurate, since, we use polling to check
whether task status is completed or not.

Signed-off-by: Vijayan Balasubramanian <balasvij@amazon.com>
  • Loading branch information
VijayanB committed Mar 25, 2024
1 parent daf09e4 commit 8880977
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 113 deletions.
24 changes: 8 additions & 16 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,29 +677,21 @@ class ForceMerge(Runner):
"""

async def __call__(self, opensearch, params):
# pylint: disable=import-outside-toplevel
import opensearchpy
max_num_segments = params.get("max-num-segments")
mode = params.get("mode")
merge_params = self._default_kw_params(params)
if max_num_segments:
merge_params["max_num_segments"] = max_num_segments
if mode == "polling":
complete = False
try:
request_context_holder.on_client_request_start()
await opensearch.indices.forcemerge(**merge_params)
request_context_holder.on_client_request_end()
complete = True
except opensearchpy.ConnectionTimeout:
pass
while not complete:
await asyncio.sleep(params.get("poll-period"))
tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"})
if len(tasks["nodes"]) == 0:
# empty nodes response indicates no tasks
merge_params["wait_for_completion"] = "false"
request_context_holder.on_client_request_start()
response_task = await opensearch.indices.forcemerge(**merge_params)
while True:
task = await opensearch.tasks.get(task_id=response_task['task'])
if task and task['completed']:
request_context_holder.on_client_request_end()
complete = True
break
await asyncio.sleep(params.get("poll-period"))
else:
request_context_holder.on_client_request_start()
await opensearch.indices.forcemerge(**merge_params)
Expand Down
166 changes: 69 additions & 97 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,122 +1154,94 @@ async def test_force_merge_with_params(self, opensearch, on_client_request_start

opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000)

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_force_merge_with_polling_no_timeout(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.indices.forcemerge.return_value = as_future()

force_merge = runner.ForceMerge()
await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0})
opensearch.indices.forcemerge.assert_called_once_with(index="_all")

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_force_merge_with_polling(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout())
opensearch.tasks.list.side_effect = [
as_future({
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": [
"data",
"ingest",
"master",
"remote_cluster_client",
"transform"
],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region"
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {}
}
}
}
}
}),
as_future({
"nodes": {}
})
]
opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"})
opensearch.tasks.get.return_value = as_future({
"completed": True,
"task": {
"node": "7PtzISisT5SiwlBGUi2GzQ",
"id": 2820798,
"type": "transport",
"action": "indices:admin/forcemerge",
"description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]",
"start_time_in_millis": 1711389911601,
"running_time_in_nanos": 2806258,
"cancellable": False,
"cancelled": False,
"headers": {}
},
"response": {
"_shards": {
"total": 10,
"successful": 10,
"failed": 0
}
}
})
force_merge = runner.ForceMerge()
await force_merge(opensearch, params={"index": "_all", "mode": "polling", "poll-period": 0})
opensearch.indices.forcemerge.assert_called_once_with(index="_all")
await force_merge(opensearch, params={"index": "_all", "mode": "polling", 'poll-period': 0})
opensearch.indices.forcemerge.assert_called_once_with(index="_all", wait_for_completion='false')
opensearch.tasks.get.assert_called_once_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798")

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_force_merge_with_polling_and_params(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout())
opensearch.tasks.list.side_effect = [
opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"})
opensearch.tasks.get.side_effect = [
as_future({
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": [
"data",
"ingest",
"master",
"remote_cluster_client",
"transform"
],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region"
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {}
}
}
}
}
"completed": False,
"task": {
"node": "7PtzISisT5SiwlBGUi2GzQ",
"id": 2820798,
"type": "transport",
"action": "indices:admin/forcemerge",
"description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]",
"start_time_in_millis": 1711389911601,
"running_time_in_nanos": 2806258,
"cancellable": False,
"cancelled": False,
"headers": {}
},
"response": {}
}),
as_future({
"nodes": {}
"completed": True,
"task": {
"node": "7PtzISisT5SiwlBGUi2GzQ",
"id": 2820798,
"type": "transport",
"action": "indices:admin/forcemerge",
"description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]",
"start_time_in_millis": 1711389911601,
"running_time_in_nanos": 2806258,
"cancellable": "false",
"cancelled": "false",
"headers": {}
},
"response": {
"_shards": {
"total": 10,
"successful": 10,
"failed": 0
}
}
})
]
force_merge = runner.ForceMerge()
# request-timeout should be ignored as mode:polling
await force_merge(opensearch, params={"index" : "_all", "mode": "polling", "max-num-segments": 1,
"request-timeout": 50000, "poll-period": 0})
opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000)
await force_merge(opensearch, params={
"index": "_all", "mode": "polling", "max-num-segments": 1, "request-timeout": 50000, "poll-period": 0
})
opensearch.indices.forcemerge.assert_called_once_with(
index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion='false')
opensearch.tasks.get.assert_called_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798")
self.assertEqual(opensearch.tasks.get.call_count, 2)


class IndicesStatsRunnerTests(TestCase):
Expand Down

0 comments on commit 8880977

Please sign in to comment.