Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create-Workload Improvements: Separate operations and test procedures from workload.json #446

35 changes: 5 additions & 30 deletions osbenchmark/resources/base-workload.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,10 @@
]
}{% endfor %}
],
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": {
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}}
},{% endraw -%}
{% block queries %}{% endblock %}
"operations": [
{% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %}
],
"test_procedures": [
{% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %}
IanHoang marked this conversation as resolved.
Show resolved Hide resolved
]
}
27 changes: 27 additions & 0 deletions osbenchmark/resources/custom-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{
"name": "wait-until-merges-finish",
"operation-type": "index-stats",
"index": "_all",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
},
"retry-until-success": true,
"include-in-reporting": false
},
{%- block queries -%}
{% for query in custom_queries %}
{
"name": "{{query.name}}",
"operation-type": "{{query['operation-type']}}",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {{query.body | replace("'", '"') }}
}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
14 changes: 0 additions & 14 deletions osbenchmark/resources/custom-query-workload.json.j2

This file was deleted.

64 changes: 64 additions & 0 deletions osbenchmark/resources/custom-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"name": "custom-test-procedures",
"description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},
{
"operation": {
"operation-type": "create-index",
{% raw %}"settings": {{ index_settings | default({}) | tojson }}
{% endraw %}}
},
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"request-params": {
{% raw %}"wait_for_status": "{{ cluster_health | default('green') }}",
{% endraw -%}"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": "index-append",
{% raw -%}"clients": {{ bulk_indexing_clients | default(8) }},
{% endraw -%}
{% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}"
{% endraw -%}},
{
"name": "refresh-after-index",
"operation": "refresh"
},
{
"operation": {
"operation-type": "force-merge",
"request-timeout": 7200{%- if force_merge_max_num_segments is defined %},
"max-num-segments": {{ force_merge_max_num_segments | tojson }}
{%- endif %}
}
},
{
"name": "refresh-after-force-merge",
"operation": "refresh"
},
{
"operation": "wait-until-merges-finish"
},
{%- block queries -%}
{% for query in custom_queries %}
{
"operation":"{{query.name}}",
{% raw -%}
"warmup-iterations": {{ warmup_iterations | default(50) }},
"iterations": {{ iterations | default(100) }},
"target-throughput": {{ target_throughput | default(3) }},
"clients": {{ search_clients | default(1) }}
{% endraw -%}}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
]
}
28 changes: 28 additions & 0 deletions osbenchmark/resources/default-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{
"name": "wait-until-merges-finish",
"operation-type": "index-stats",
"index": "_all",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
},
"retry-until-success": true,
"include-in-reporting": false
},
{
"name": "match-all",
"operation-type": "search",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {
"size": {{match_all_size | default(10)}},
"query": {
"match_all": {}
IanHoang marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
16 changes: 0 additions & 16 deletions osbenchmark/resources/default-query-workload.json.j2

This file was deleted.

59 changes: 59 additions & 0 deletions osbenchmark/resources/default-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"name": "default-test-procedure",
"description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark. Workload deletes existing indexes, creates indexes, ingests documents, and runs default match-all query.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},
{
"operation": {
"operation-type": "create-index",
{% raw %}"settings": {{ index_settings | default({}) | tojson }}
{% endraw %}}
},
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"request-params": {
{% raw %}"wait_for_status": "{{ cluster_health | default('green') }}",
{% endraw -%}"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": "index-append",
{% raw -%}"clients": {{ bulk_indexing_clients | default(8) }},
{% endraw -%}
{% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}"
{% endraw -%}},
{
"name": "refresh-after-index",
"operation": "refresh"
},
{
"operation": {
"operation-type": "force-merge",
"request-timeout": 7200{%- if force_merge_max_num_segments is defined %},
"max-num-segments": {{ force_merge_max_num_segments | tojson }}
{%- endif %}
}
},
{
"name": "refresh-after-force-merge",
"operation": "refresh"
},
{
"operation": "wait-until-merges-finish"
},{% raw %}
{
IanHoang marked this conversation as resolved.
Show resolved Hide resolved
"operation": "match-all",
"warmup-iterations": {{ warmup_iterations | default(50) }},
"iterations": {{ iterations | default(100) }},
"target-throughput": {{ target_throughput | default(3) }},
"clients": {{ search_clients | default(1) }}
}{% endraw -%}
]
}
79 changes: 64 additions & 15 deletions osbenchmark/workload_generator/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import logging
import os
import shutil
import json

from opensearchpy import OpenSearchException
Expand All @@ -34,14 +35,6 @@
from osbenchmark.workload_generator import corpus, index
from osbenchmark.utils import io, opts, console


def process_template(templates_path, template_filename, template_vars, output_path):
env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml']))
template = env.get_template(template_filename)

with open(output_path, "w") as f:
f.write(template.render(template_vars))

def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested):
if not docs_were_requested:
return
Expand Down Expand Up @@ -108,35 +101,84 @@ def process_custom_queries(custom_queries):

return data

def write_template(template_vars, templates_path, output_path, template_file):
template = get_template(template_file, templates_path)
with open(output_path, "w") as f:
f.write(template.render(template_vars))

def get_template(template_file, templates_path):
template_file_name = template_file + ".json.j2"

env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml']))

return env.get_template(template_file_name)

def render_templates(workload_path,
operations_path,
test_procedures_path,
templates_path,
template_vars,
custom_queries):
write_template(template_vars, templates_path, workload_path, "base-workload")

if custom_queries:
write_template(template_vars, templates_path, operations_path, "custom-operations")
write_template(template_vars, templates_path, test_procedures_path, "custom-test-procedures")
else:
write_template(template_vars, templates_path, operations_path, "default-operations")
write_template(template_vars, templates_path, test_procedures_path, "default-test-procedures")

def create_workload(cfg):
logger = logging.getLogger(__name__)

# All inputs provided by user
workload_name = cfg.opts("workload", "workload.name")
indices = cfg.opts("generator", "indices")
root_path = cfg.opts("generator", "output.path")
target_hosts = cfg.opts("client", "hosts")
client_options = cfg.opts("client", "options")
number_of_docs = cfg.opts("generator", "number_of_docs")
unprocessed_custom_queries = cfg.opts("workload", "custom_queries")
templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources")

# Process custom queries
custom_queries = process_custom_queries(unprocessed_custom_queries)

logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices)
logger.info("Number of Docs: %s", number_of_docs)

# Initialize client factory
client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT],
client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create()

info = client.info()
console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger)

# Establish output paths directory
output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name))

operations_path = os.path.join(output_path, "operations")
test_procedures_path = os.path.join(output_path, "test_procedures")

if os.path.exists(output_path):
try:
logger.info("Workload already exists. Removing existing workload [%s] in path [%s]", workload_name, output_path)
shutil.rmtree(output_path)
except OSError:
logger.error("Had issues removing existing workload [%s] in path [%s]", workload_name, output_path)

io.ensure_dir(output_path)
io.ensure_dir(operations_path)
io.ensure_dir(test_procedures_path)

# Extract Indices and Corpora
logger.info("Extracting indices and corpora")
indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs)
logger.info("Finished extracting indices and corpora")

if len(indices) == 0:
raise RuntimeError("Failed to extract any indices for workload!")

# Collect all itmes into dictionary
template_vars = {
"workload_name": workload_name,
"indices": indices,
Expand All @@ -147,12 +189,19 @@ def create_workload(cfg):
logger.info("Template Vars: %s", template_vars)

workload_path = os.path.join(output_path, "workload.json")
templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources")

if custom_queries:
process_template(templates_path, "custom-query-workload.json.j2", template_vars, workload_path)
else:
process_template(templates_path, "default-query-workload.json.j2", template_vars, workload_path)
operations_path = os.path.join(operations_path, "default.json")
test_procedures_path = os.path.join(test_procedures_path, "default.json")

# Render all templates
logger.info("Rendering templates")
render_templates(
workload_path,
operations_path,
test_procedures_path,
templates_path,
template_vars,
custom_queries
)

console.println("")
console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}")
Loading