Skip to content

Commit

Permalink
[testing-on-gke] Update output parsing based on workload config parse…
Browse files Browse the repository at this point in the history
…rs (#2271)

* update parse_tests scripts based on workload config parsers

* Remove hardcoded configurations from output parsers

* add support for project_number and config-file arguments in output-parsers

* fix some rebase glitches

* fix some logs

* cosmetic improvements

* fix errors in script runtimes

* dont fail on missing cpu/memory metrics

* remove redundant line

* address review comments
  • Loading branch information
gargnitingoogle authored Aug 21, 2024
1 parent 447afa3 commit 51abe5c
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 111 deletions.
177 changes: 125 additions & 52 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import json, os, pprint, subprocess
import sys
import dlio_workload

sys.path.append("../")
from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed

LOCAL_LOGS_LOCATION = "../../bin/dlio-logs"
_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs"

record = {
"pod_name": "",
Expand All @@ -38,45 +40,78 @@
"lowest_memory": 0,
"highest_cpu": 0.0,
"lowest_cpu": 0.0,
"gcsfuse_mount_options": "",
}

if __name__ == "__main__":
bucketNames = [
"gke-dlio-unet3d-100kb-500k",
"gke-dlio-unet3d-150mb-5k",
"gke-dlio-unet3d-3mb-100k",
"gke-dlio-unet3d-500kb-1m",
]

try:
os.makedirs(LOCAL_LOGS_LOCATION)
except FileExistsError:
pass

for bucketName in bucketNames:
print(f"Download DLIO logs from the bucket {bucketName}...")
def downloadDlioOutputs(dlioWorkloads):
for dlioWorkload in dlioWorkloads:
print(f"Downloading DLIO logs from the bucket {dlioWorkload.bucket}...")
result = subprocess.run(
[
"gsutil",
"-m",
"gcloud",
"-q", # ignore prompts
"storage",
"cp",
"-r",
f"gs://{bucketName}/logs",
LOCAL_LOGS_LOCATION,
"--no-user-output-enabled", # do not print names of files being copied
f"gs://{dlioWorkload.bucket}/logs",
_LOCAL_LOGS_LOCATION,
],
capture_output=False,
text=True,
)
if result.returncode < 0:
print(f"failed to fetch DLIO logs, error: {result.stderr}")


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="DLIO Unet3d test output parser",
description=(
"This program takes in a json workload configuration file and parses"
" it for valid DLIO workloads and the locations of their test outputs"
" on GCS. It downloads each such output object locally to"
" {_LOCAL_LOGS_LOCATION} and parses them for DLIO test runs, and then"
" dumps their output metrics into a CSV report file."
),
)
parser.add_argument(
"--workload-config",
help=(
"A json configuration file to define workloads that were run to"
" generate the outputs that should be parsed."
),
required=True,
)
parser.add_argument(
"--project-number",
help=(
"project-number (e.g. 93817472919) is needed to fetch the cpu/memory"
" utilization data from GCP."
),
required=True,
)
args = parser.parse_args()

try:
os.makedirs(_LOCAL_LOGS_LOCATION)
except FileExistsError:
pass

dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
downloadDlioOutputs(dlioWorkloads)

"""
"{num_files_train}-{mean_file_size}-{batch_size}":
"mean_file_size": str
"num_files_train": str
"batch_size": str
"records":
"local-ssd": [record1, record2, record3, record4]
"gcsfuse-generic": [record1, record2, record3, record4]
"gcsfuse-file-cache": [record1, record2, record3, record4]
"gcsfuse-no-file-cache": [record1, record2, record3, record4]
"""
Expand All @@ -85,15 +120,31 @@
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

for root, _, files in os.walk(LOCAL_LOGS_LOCATION):
for root, _, files in os.walk(_LOCAL_LOGS_LOCATION):
if files:
print(f"Parsing directory {root} ...")
per_epoch_stats_file = root + "/per_epoch_stats.json"
summary_file = root + "/summary.json"

gcsfuse_mount_options = ""
gcsfuse_mount_options_file = root + "/gcsfuse_mount_options"
if os.path.isfile(gcsfuse_mount_options_file):
with open(gcsfuse_mount_options_file) as f:
gcsfuse_mount_options = f.read().strip()

with open(per_epoch_stats_file, "r") as f:
per_epoch_stats_data = json.load(f)
try:
per_epoch_stats_data = json.load(f)
except:
print(f"failed to json-parse {per_epoch_stats_file}")
continue

with open(summary_file, "r") as f:
summary_data = json.load(f)
try:
summary_data = json.load(f)
except:
print(f"failed to json-parse {summary_file}")
continue

for i in range(summary_data["epochs"]):
test_name = summary_data["hostname"]
Expand All @@ -107,6 +158,7 @@
"batch_size": part_list[4],
"records": {
"local-ssd": [],
"gcsfuse-generic": [],
"gcsfuse-file-cache": [],
"gcsfuse-no-file-cache": [],
},
Expand Down Expand Up @@ -134,11 +186,20 @@
r["end"] = standard_timestamp(per_epoch_stats_data[str(i + 1)]["end"])
if r["scenario"] != "local-ssd" and mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(
r["pod_name"], r["start"], r["end"]
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu(
r["pod_name"], r["start"], r["end"]
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
pass

r["gcsfuse_mount_options"] = gcsfuse_mount_options

pprint.pprint(r)

Expand All @@ -147,28 +208,23 @@

output[key]["records"][r["scenario"]][i] = r

output_order = [
"500000-102400-800",
"500000-102400-128",
"1000000-512000-800",
"1000000-512000-128",
"100000-3145728-200",
"5000-157286400-4",
scenario_order = [
"local-ssd",
"gcsfuse-generic",
"gcsfuse-no-file-cache",
"gcsfuse-file-cache",
]
scenario_order = ["local-ssd", "gcsfuse-no-file-cache", "gcsfuse-file-cache"]

output_file = open("./output.csv", "a")
output_file.write(
"File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration"
" (s),GPU Utilization (%),Throughput (sample/s),Throughput"
" (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory (MB),GCSFuse"
" Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse Highest CPU"
" (core),Pod,Start,End\n"
" (core),Pod,Start,End,GcsfuseMountOptions\n"
)

for key in output_order:
if key not in output:
continue
for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
Expand All @@ -177,21 +233,38 @@
)

for scenario in scenario_order:
for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']}\n"
)
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n"
)
else:
for i in range(len(record_set["records"][scenario])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = "NA"
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n"
)

output_file.close()
Loading

0 comments on commit 51abe5c

Please sign in to comment.