From b73d3a9de2b4dbc2d3a5bc071c1d47167ae5aec6 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Wed, 13 Mar 2024 14:18:25 -0700 Subject: [PATCH] refactor force merge --- osbenchmark/worker_coordinator/runner.py | 38 ++++++++++++++---------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index e31deca83..a429bb435 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -41,6 +41,7 @@ import ijson from opensearchpy import ConnectionTimeout +from opensearchpy.client import query_params from osbenchmark import exceptions, workload from osbenchmark.utils import convert @@ -676,30 +677,37 @@ class ForceMerge(Runner): Runs a force merge operation against OpenSearch. """ + # TODO Remove this method once opensearch-py dependency is released and updated to 2.5 + @query_params( + "allow_no_indices", + "expand_wildcards", + "flush", + "ignore_unavailable", + "max_num_segments", + "only_expunge_deletes", + "wait_for_completion" + ) + async def _force_merge(self, opensearch, index, params, headers): + force_merge_path = "/{}/_forcemerge".format(index) + return await opensearch.transport.perform_request("POST", force_merge_path, params=params, headers=headers) + async def __call__(self, opensearch, params): - # pylint: disable=import-outside-toplevel - import opensearchpy max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": - complete = False - try: - request_context_holder.on_client_request_start() - await opensearch.indices.forcemerge(**merge_params) - request_context_holder.on_client_request_end() - complete = True - except opensearchpy.ConnectionTimeout: - pass - while not complete: - await asyncio.sleep(params.get("poll-period")) - tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"}) - if len(tasks["nodes"]) == 0: + merge_params["wait_for_completion"] = "false" + request_context_holder.on_client_request_start() + response_task = await self._force_merge(opensearch, **merge_params) + while True: + tasks = await opensearch.tasks.get(task_id=response_task['task']) + if tasks['completed']: # empty nodes response indicates no tasks request_context_holder.on_client_request_end() - complete = True + break + await asyncio.sleep(params.get("poll-period")) else: request_context_holder.on_client_request_start() await opensearch.indices.forcemerge(**merge_params)