diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index e31deca83..58d58441b 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -677,29 +677,21 @@ class ForceMerge(Runner): """ async def __call__(self, opensearch, params): - # pylint: disable=import-outside-toplevel - import opensearchpy max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": - complete = False - try: - request_context_holder.on_client_request_start() - await opensearch.indices.forcemerge(**merge_params) - request_context_holder.on_client_request_end() - complete = True - except opensearchpy.ConnectionTimeout: - pass - while not complete: - await asyncio.sleep(params.get("poll-period")) - tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"}) - if len(tasks["nodes"]) == 0: - # empty nodes response indicates no tasks + merge_params["wait_for_completion"] = "false" + request_context_holder.on_client_request_start() + response_task = await opensearch.indices.forcemerge(**merge_params) + while True: + task = await opensearch.tasks.get(task_id=response_task['task']) + if task and task['completed']: request_context_holder.on_client_request_end() - complete = True + break + await asyncio.sleep(params.get("poll-period")) else: request_context_holder.on_client_request_start() await opensearch.indices.forcemerge(**merge_params) diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 16fdd2730..92d7bf55f 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -1154,122 +1154,94 @@ async def test_force_merge_with_params(self, opensearch, on_client_request_start opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_force_merge_with_polling_no_timeout(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future() - - force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ - as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } - }), - as_future({ - "nodes": {} - }) - ] + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.return_value = as_future({ + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } + }) force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index": "_all", "mode": "polling", "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") + await force_merge(opensearch, params={"index": "_all", "mode": "polling", 'poll-period': 0}) + opensearch.indices.forcemerge.assert_called_once_with(index="_all", wait_for_completion='false') + opensearch.tasks.get.assert_called_once_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling_and_params(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.side_effect = [ as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } + "completed": False, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": {} }), as_future({ - "nodes": {} + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": "false", + "cancelled": "false", + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } }) ] force_merge = runner.ForceMerge() # request-timeout should be ignored as mode:polling - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", "max-num-segments": 1, - "request-timeout": 50000, "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) + await force_merge(opensearch, params={ + "index": "_all", "mode": "polling", "max-num-segments": 1, "request-timeout": 50000, "poll-period": 0 + }) + opensearch.indices.forcemerge.assert_called_once_with( + index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion='false') + opensearch.tasks.get.assert_called_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") + self.assertEqual(opensearch.tasks.get.call_count, 2) class IndicesStatsRunnerTests(TestCase):