Skip to content

Commit

Permalink
refactor force merge
Browse files Browse the repository at this point in the history
  • Loading branch information
VijayanB committed Mar 13, 2024
1 parent 0029a65 commit b73d3a9
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import ijson
from opensearchpy import ConnectionTimeout
from opensearchpy.client import query_params

from osbenchmark import exceptions, workload
from osbenchmark.utils import convert
Expand Down Expand Up @@ -676,30 +677,37 @@ class ForceMerge(Runner):
Runs a force merge operation against OpenSearch.
"""

# TODO Remove this method once opensearch-py dependency is released and updated to 2.5
@query_params(
"allow_no_indices",
"expand_wildcards",
"flush",
"ignore_unavailable",
"max_num_segments",
"only_expunge_deletes",
"wait_for_completion"
)
async def _force_merge(self, opensearch, index, params, headers):
force_merge_path = "/{}/_forcemerge".format(index)
return await opensearch.transport.perform_request("POST", force_merge_path, params=params, headers=headers)

async def __call__(self, opensearch, params):
# pylint: disable=import-outside-toplevel
import opensearchpy
max_num_segments = params.get("max-num-segments")
mode = params.get("mode")
merge_params = self._default_kw_params(params)
if max_num_segments:
merge_params["max_num_segments"] = max_num_segments
if mode == "polling":
complete = False
try:
request_context_holder.on_client_request_start()
await opensearch.indices.forcemerge(**merge_params)
request_context_holder.on_client_request_end()
complete = True
except opensearchpy.ConnectionTimeout:
pass
while not complete:
await asyncio.sleep(params.get("poll-period"))
tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"})
if len(tasks["nodes"]) == 0:
merge_params["wait_for_completion"] = "false"
request_context_holder.on_client_request_start()
response_task = await self._force_merge(opensearch, **merge_params)
while True:
tasks = await opensearch.tasks.get(task_id=response_task['task'])
if tasks['completed']:
# empty nodes response indicates no tasks
request_context_holder.on_client_request_end()
complete = True
break
await asyncio.sleep(params.get("poll-period"))
else:
request_context_holder.on_client_request_start()
await opensearch.indices.forcemerge(**merge_params)
Expand Down

0 comments on commit b73d3a9

Please sign in to comment.