From e17abfd3d38c99e46a18528002fcee074ec579a9 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Wed, 13 Mar 2024 14:18:25 -0700 Subject: [PATCH] Update force merge polling to use async operation Change force merge polling logic to use wait_for_completion to false, to make it async and use task's get api to check whether task is completed or not to exit from force merge. Here, request end time is calcuated only after task is completed. This request end time will not be 100% accurate, since, we use polling to check whether task status is completed or not. Signed-off-by: Vijayan Balasubramanian --- osbenchmark/worker_coordinator/runner.py | 24 ++-- tests/worker_coordinator/runner_test.py | 166 ++++++++++------------- 2 files changed, 77 insertions(+), 113 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index e31deca83..58d58441b 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -677,29 +677,21 @@ class ForceMerge(Runner): """ async def __call__(self, opensearch, params): - # pylint: disable=import-outside-toplevel - import opensearchpy max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": - complete = False - try: - request_context_holder.on_client_request_start() - await opensearch.indices.forcemerge(**merge_params) - request_context_holder.on_client_request_end() - complete = True - except opensearchpy.ConnectionTimeout: - pass - while not complete: - await asyncio.sleep(params.get("poll-period")) - tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"}) - if len(tasks["nodes"]) == 0: - # empty nodes response indicates no tasks + merge_params["wait_for_completion"] = "false" + request_context_holder.on_client_request_start() + response_task = await opensearch.indices.forcemerge(**merge_params) + while True: + task = await opensearch.tasks.get(task_id=response_task['task']) + if task and task['completed']: request_context_holder.on_client_request_end() - complete = True + break + await asyncio.sleep(params.get("poll-period")) else: request_context_holder.on_client_request_start() await opensearch.indices.forcemerge(**merge_params) diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 16fdd2730..92d7bf55f 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -1154,122 +1154,94 @@ async def test_force_merge_with_params(self, opensearch, on_client_request_start opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_force_merge_with_polling_no_timeout(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future() - - force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ - as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } - }), - as_future({ - "nodes": {} - }) - ] + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.return_value = as_future({ + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } + }) force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index": "_all", "mode": "polling", "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") + await force_merge(opensearch, params={"index": "_all", "mode": "polling", 'poll-period': 0}) + opensearch.indices.forcemerge.assert_called_once_with(index="_all", wait_for_completion='false') + opensearch.tasks.get.assert_called_once_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling_and_params(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.side_effect = [ as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } + "completed": False, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": {} }), as_future({ - "nodes": {} + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": "false", + "cancelled": "false", + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } }) ] force_merge = runner.ForceMerge() # request-timeout should be ignored as mode:polling - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", "max-num-segments": 1, - "request-timeout": 50000, "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) + await force_merge(opensearch, params={ + "index": "_all", "mode": "polling", "max-num-segments": 1, "request-timeout": 50000, "poll-period": 0 + }) + opensearch.indices.forcemerge.assert_called_once_with( + index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion='false') + opensearch.tasks.get.assert_called_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") + self.assertEqual(opensearch.tasks.get.call_count, 2) class IndicesStatsRunnerTests(TestCase):