Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create-Workload Improvements: Write Test Procedures and Operations into Separate Directories and Files #397

Closed
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,6 @@ recipes/ccr/ccr-target-hosts.json

# Tracker tracks
tracks/

# Visual Studio Code for Contributors
.vscode/
37 changes: 6 additions & 31 deletions osbenchmark/resources/base-workload.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,10 @@
]
}{% endfor %}
],
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": {
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}}
},{% endraw -%}
{% block queries %}{% endblock %}
"operations": [
{% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %}
],
"test_procedures": [
{% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %}
]
}
}
22 changes: 22 additions & 0 deletions osbenchmark/resources/custom-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"operation": {
Copy link
Collaborator

@IanHoang IanHoang Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operations file includes extraneous fields for each operations.
Ingest operations should include at minimum name, operation-type, bulk-size, and ingest-percentage fields.

      "name": "index",
      "operation-type": "bulk",
      "bulk-size": {{bulk_size | default(10000)}},
      "ingest-percentage": {{ingest_percentage | default(100)}}
    }

Each search operation should just include a json name, operation-type, index, and body fields.

        "name": "default",
        "operation-type": "search",
        "index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
        "body": {
          "query": {
            "match_all": {}
          }
        }

Fields like search_clients or bulk_indexing_clients belong in test_procedures file. For reference, see NYC_Taxis workload's operations file: https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nyc_taxis/operations/default.json

"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},{% raw %}
"clients": {{bulk_indexing_clients | default(8)}}
},{% endraw %}
{
"operation": {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For custom operations, we need to remove the default operation. This should only be included in the default operations file. Users might not want a match_all query included in their workload if they are already providing their own queries.

"name": "default",
"operation-type": "search",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {
"query": {
"match_all": {}
}
}
},{% raw %}
"clients": {{search_clients | default(8)}}
}{% endraw %}
14 changes: 0 additions & 14 deletions osbenchmark/resources/custom-query-workload.json.j2

This file was deleted.

42 changes: 42 additions & 0 deletions osbenchmark/resources/custom-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"name": "append-no-conflicts",
"description": "Indexes the whole document corpus using OpenSearch default settings. We only adjust the number of replicas as we benchmark a single node cluster and Benchmark will only start the benchmark if the cluster turns green. Document ids are unique so all index operations are append only.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{% endraw -%}
{%- block queries -%}
{% for query in custom_queries %}
{
"operation": {
"name": "{{query.name}}",
"operation-type": "{{query['operation-type']}}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just need to specify the name field and not the operation-type, index, and body since the operations are already defined in the operations/default.json directory / file. Instead, we should add parameters that the user can insert such as warmup-iterations, iterations, and search_clients. Use this search operation as reference: https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nyc_taxis/test_procedures/default.json#L56-L69

"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {{query.body | replace("'", '"') }}
}
}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
}
]
}

16 changes: 16 additions & 0 deletions osbenchmark/resources/default-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{
"name": "default",
"operation-type": "search",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {
"query": {
"match_all": {}
}
}
}
16 changes: 0 additions & 16 deletions osbenchmark/resources/default-query-workload.json.j2

This file was deleted.

29 changes: 29 additions & 0 deletions osbenchmark/resources/default-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"name": "append-no-conflicts",
"description": "Indexes the whole document corpus using OpenSearch default settings. We only adjust the number of replicas as we benchmark a single node cluster and Benchmark will only start the benchmark if the cluster turns green. Document ids are unique so all index operations are append only.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
}
{% endraw -%}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why the test is only running delete-index, create-index, and cluster-health. It needs default ingestion operation and search operation.

]
}

21 changes: 21 additions & 0 deletions osbenchmark/resources/test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},{% endraw -%}
{% block queries %}{% endblock %}
49 changes: 38 additions & 11 deletions osbenchmark/workload_generator/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
Expand Down Expand Up @@ -42,12 +42,12 @@ def template_vars(index_name, out_path, doc_count):
"path": comp_outpath,
"doc_count": doc_count,
"uncompressed_bytes": os.path.getsize(out_path),
"compressed_bytes": os.path.getsize(comp_outpath)
"compressed_bytes": os.path.getsize(comp_outpath),
}


def get_doc_outpath(outdir, name, suffix=""):
return os.path.join(outdir, f"{name}-documents{suffix}.json")
def get_doc_outpath(outdir, suffix=""):
return os.path.join(outdir, f"documents{suffix}.json")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was name removed?



def extract(client, output_path, index, number_of_docs_requested=None):
Expand All @@ -64,16 +64,34 @@ def extract(client, output_path, index, number_of_docs_requested=None):

number_of_docs = client.count(index=index)["count"]

total_docs = number_of_docs if not number_of_docs_requested else min(number_of_docs, number_of_docs_requested)
total_docs = (
number_of_docs
if not number_of_docs_requested
else min(number_of_docs, number_of_docs_requested)
)

if total_docs > 0:
logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", number_of_docs, index, total_docs)
docs_path = get_doc_outpath(output_path, index)
dump_documents(client, index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), " for test mode")
logger.info(
"[%d] total docs in index [%s]. Extracting [%s] docs.",
number_of_docs,
index,
total_docs,
)
docs_path = get_doc_outpath(output_path)
dump_documents(
client,
index,
get_doc_outpath(output_path, "-1k"),
min(total_docs, 1000),
" for test mode",
)
dump_documents(client, index, docs_path, total_docs)
return template_vars(index, docs_path, total_docs)
else:
logger.info("Skipping corpus extraction fo index [%s] as it contains no documents.", index)
logger.info(
"Skipping corpus extraction fo index [%s] as it contains no documents.",
index,
)
return None


Expand All @@ -94,12 +112,21 @@ def dump_documents(client, index, out_path, number_of_docs, progress_message_suf
for n, doc in enumerate(helpers.scan(client, query=query, index=index)):
if n >= number_of_docs:
break
data = (json.dumps(doc["_source"], separators=(",", ":")) + "\n").encode("utf-8")
data = (
json.dumps(doc["_source"], separators=(",", ":")) + "\n"
).encode("utf-8")

outfile.write(data)
comp_outfile.write(compressor.compress(data))

render_progress(progress, progress_message_suffix, index, n + 1, number_of_docs, freq)
render_progress(
progress,
progress_message_suffix,
index,
n + 1,
number_of_docs,
freq,
)

comp_outfile.write(compressor.flush())
progress.finish()
Expand Down
38 changes: 21 additions & 17 deletions osbenchmark/workload_generator/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
Expand All @@ -26,14 +26,16 @@
import logging
import os

INDEX_SETTINGS_EPHEMERAL_KEYS = ["uuid",
"creation_date",
"version",
"provided_name",
"store"]
INDEX_SETTINGS_EPHEMERAL_KEYS = [
"uuid",
"creation_date",
"version",
"provided_name",
"store",
]
INDEX_SETTINGS_PARAMETERS = {
"number_of_replicas": "{{{{number_of_replicas | default({orig})}}}}",
"number_of_shards": "{{{{number_of_shards | default({orig})}}}}"
"number_of_shards": "{{{{number_of_shards | default({orig})}}}}",
}


Expand Down Expand Up @@ -81,13 +83,13 @@ def extract_index_mapping_and_settings(client, index_pattern):
valid, reason = is_valid(index)
if valid:
mappings = details["mappings"]
index_settings = filter_ephemeral_index_settings(details["settings"]["index"])
index_settings = filter_ephemeral_index_settings(
details["settings"]["index"]
)
update_index_setting_parameters(index_settings)
results[index] = {
"mappings": mappings,
"settings": {
"index": index_settings
}
"settings": {"index": index_settings},
}
else:
logger.info("Skipping index [%s] (reason: %s).", index, reason)
Expand All @@ -107,14 +109,16 @@ def extract(client, outdir, index_pattern):

index_obj = extract_index_mapping_and_settings(client, index_pattern)
for index, details in index_obj.items():
filename = f"{index}.json"
filename = f"index.json"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you simplify this to just be named as index.json?

outpath = os.path.join(outdir, filename)
with open(outpath, "w") as outfile:
json.dump(details, outfile, indent=4, sort_keys=True)
outfile.write("\n")
results.append({
"name": index,
"path": outpath,
"filename": filename,
})
results.append(
{
"name": index,
"path": outpath,
"filename": filename,
}
)
return results
Loading