Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create-Workload Improvements: Separate operations and test procedures from workload.json #446

35 changes: 5 additions & 30 deletions osbenchmark/resources/base-workload.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,10 @@
]
}{% endfor %}
],
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": {
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}}
},{% endraw -%}
{% block queries %}{% endblock %}
"operations": [
{% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %}
],
"test_procedures": [
{% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %}
]
}
16 changes: 16 additions & 0 deletions osbenchmark/resources/custom-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{%- block queries -%}
{% for query in custom_queries %}
{
"name": "{{query.name}}",
"operation-type": "{{query['operation-type']}}",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {{query.body | replace("'", '"') }}
}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
14 changes: 0 additions & 14 deletions osbenchmark/resources/custom-query-workload.json.j2

This file was deleted.

58 changes: 58 additions & 0 deletions osbenchmark/resources/custom-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"name": "custom-test-procedures",
"description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw -%}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
{% endraw -%}
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": "index-append",
{% raw -%}"clients": {{bulk_indexing_clients | default(8)}},
"ignore-response-error-level": "{{error_level | default('non-fatal')}}"
},
{% endraw -%}
{
"name": "refresh-after-index",
"operation": "refresh"
},
{%- block queries -%}
{% for query in custom_queries %}
{
"operation":"{{query.name}}",
{% raw -%}
"warmup-iterations": {{warmup_iterations | default(50)}},
"iterations": {{iterations | default(100)}}
{%- if not target_throughput %}
,"target-throughput": 3
{%- elif target_throughput is string and target_throughput.lower() == 'none' %}
{%- else %}
,"target-throughput": {{ target_throughput | tojson }}
{%- endif %}
{%-if search_clients is defined and search_clients %}
,"clients": {{ search_clients | tojson}}
{%- endif %}
{% endraw -%}
}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
]
}
16 changes: 16 additions & 0 deletions osbenchmark/resources/default-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{
"name": "default",
"operation-type": "search",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {
"query": {
"match_all": {}
}
}
}
16 changes: 0 additions & 16 deletions osbenchmark/resources/default-query-workload.json.j2

This file was deleted.

51 changes: 51 additions & 0 deletions osbenchmark/resources/default-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"name": "default-test-procedure",
"description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark. Workload deletes existing indexes, creates indexes, ingests documents, and runs default match-all query.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},{% endraw -%}
{
"operation": "index-append",
{% raw -%}"clients": {{bulk_indexing_clients | default(8)}},
"ignore-response-error-level": "{{error_level | default('non-fatal')}}"
},
{% endraw -%}
{
"name": "refresh-after-index",
"operation": "refresh"
},{% raw %}
{
"operation": "default",
"warmup-iterations": {{warmup_iterations | default(50)}},
"iterations": {{iterations | default(100)}}
{%- if not target_throughput %}
,"target-throughput": 3
{%- elif target_throughput is string and target_throughput.lower() == 'none' %}
{%- else %}
,"target-throughput": {{ target_throughput | tojson }}
{%- endif %}
{%-if search_clients is defined and search_clients %}
,"clients": {{ search_clients | tojson}}
{%- endif %}
}{% endraw -%}
]
}
75 changes: 60 additions & 15 deletions osbenchmark/workload_generator/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import logging
import os
import shutil
import json

from opensearchpy import OpenSearchException
Expand All @@ -34,14 +35,6 @@
from osbenchmark.workload_generator import corpus, index
from osbenchmark.utils import io, opts, console


def process_template(templates_path, template_filename, template_vars, output_path):
env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml']))
template = env.get_template(template_filename)

with open(output_path, "w") as f:
f.write(template.render(template_vars))

def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested):
if not docs_were_requested:
return
Expand Down Expand Up @@ -108,35 +101,80 @@ def process_custom_queries(custom_queries):

return data

def write_template(template_vars, templates_path, output_path, template_file):
template = get_template(template_file, templates_path)
with open(output_path, "w") as f:
f.write(template.render(template_vars))

def get_template(template_file, templates_path):
template_file_name = template_file + ".json.j2"

env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml']))

return env.get_template(template_file_name)

def render_templates(
workload_path, operations_path, test_procedures_path, templates_path, template_vars, custom_queries
):
write_template(template_vars, templates_path, workload_path, "base-workload")

if custom_queries:
write_template(template_vars, templates_path, operations_path, "custom-operations")
write_template(template_vars, templates_path, test_procedures_path, "custom-test-procedures")
else:
write_template(template_vars, templates_path, operations_path, "default-operations")
write_template(template_vars, templates_path, test_procedures_path, "default-test-procedures")

def create_workload(cfg):
logger = logging.getLogger(__name__)

# All inputs provided by user
workload_name = cfg.opts("workload", "workload.name")
indices = cfg.opts("generator", "indices")
root_path = cfg.opts("generator", "output.path")
target_hosts = cfg.opts("client", "hosts")
client_options = cfg.opts("client", "options")
number_of_docs = cfg.opts("generator", "number_of_docs")
unprocessed_custom_queries = cfg.opts("workload", "custom_queries")
templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources")

# Process custom queries
custom_queries = process_custom_queries(unprocessed_custom_queries)

logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices)
logger.info("Number of Docs: %s", number_of_docs)

# Initialize client factory
client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT],
client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create()

info = client.info()
console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger)

# Establish output paths directory
output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name))

operations_path = os.path.join(output_path, "operations")
test_procedures_path = os.path.join(output_path, "test_procedures")

try:
logger.info("Removing existing workload [%s] in path [%s]", workload_name, output_path)
shutil.rmtree(output_path)
except OSError:
pass

io.ensure_dir(output_path)
io.ensure_dir(operations_path)
io.ensure_dir(test_procedures_path)

# Extract Indices and Corpora
logger.info("Extracting indices and corpora")
indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs)
logger.info("Finished extracting indices and corpora")

if len(indices) == 0:
raise RuntimeError("Failed to extract any indices for workload!")

# Collect all itmes into dictionary
template_vars = {
"workload_name": workload_name,
"indices": indices,
Expand All @@ -147,12 +185,19 @@ def create_workload(cfg):
logger.info("Template Vars: %s", template_vars)

workload_path = os.path.join(output_path, "workload.json")
templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources")

if custom_queries:
process_template(templates_path, "custom-query-workload.json.j2", template_vars, workload_path)
else:
process_template(templates_path, "default-query-workload.json.j2", template_vars, workload_path)
operations_path = os.path.join(operations_path, "default.json")
test_procedures_path = os.path.join(test_procedures_path, "default.json")

# Render all templates
logger.info("Rendering templates")
render_templates(
workload_path,
operations_path,
test_procedures_path,
templates_path,
template_vars,
custom_queries
)

console.println("")
console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}")
Loading