From 51abe5c48be28c9e36058cffbece97b841d064c0 Mon Sep 17 00:00:00 2001 From: Nitin Garg <113666283+gargnitingoogle@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:56:44 +0530 Subject: [PATCH] [testing-on-gke] Update output parsing based on workload config parsers (#2271) * update parse_tests scripts based on workload config parsers * Remove hardcoded configurations from output parsers * add support for project_number and config-file arguments in output-parsers * fix some rebase glitches * fix some logs * cosmetic improvements * fix errors in script runtimes * dont fail on missing cpu/memory metrics * remove redundant line * address review comments --- .../examples/dlio/parse_logs.py | 177 +++++++++++----- .../testing_on_gke/examples/fio/parse_logs.py | 199 +++++++++++++----- .../testing_on_gke/examples/utils/utils.py | 20 +- 3 files changed, 285 insertions(+), 111 deletions(-) diff --git a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py index 0a586129cb..eddf0beef0 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py @@ -15,13 +15,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import argparse import json, os, pprint, subprocess import sys +import dlio_workload sys.path.append("../") from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed -LOCAL_LOGS_LOCATION = "../../bin/dlio-logs" +_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs" record = { "pod_name": "", @@ -38,31 +40,23 @@ "lowest_memory": 0, "highest_cpu": 0.0, "lowest_cpu": 0.0, + "gcsfuse_mount_options": "", } -if __name__ == "__main__": - bucketNames = [ - "gke-dlio-unet3d-100kb-500k", - "gke-dlio-unet3d-150mb-5k", - "gke-dlio-unet3d-3mb-100k", - "gke-dlio-unet3d-500kb-1m", - ] - - try: - os.makedirs(LOCAL_LOGS_LOCATION) - except FileExistsError: - pass - for bucketName in bucketNames: - print(f"Download DLIO logs from the bucket {bucketName}...") +def downloadDlioOutputs(dlioWorkloads): + for dlioWorkload in dlioWorkloads: + print(f"Downloading DLIO logs from the bucket {dlioWorkload.bucket}...") result = subprocess.run( [ - "gsutil", - "-m", + "gcloud", + "-q", # ignore prompts + "storage", "cp", "-r", - f"gs://{bucketName}/logs", - LOCAL_LOGS_LOCATION, + "--no-user-output-enabled", # do not print names of files being copied + f"gs://{dlioWorkload.bucket}/logs", + _LOCAL_LOGS_LOCATION, ], capture_output=False, text=True, @@ -70,6 +64,46 @@ if result.returncode < 0: print(f"failed to fetch DLIO logs, error: {result.stderr}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="DLIO Unet3d test output parser", + description=( + "This program takes in a json workload configuration file and parses" + " it for valid DLIO workloads and the locations of their test outputs" + " on GCS. It downloads each such output object locally to" + " {_LOCAL_LOGS_LOCATION} and parses them for DLIO test runs, and then" + " dumps their output metrics into a CSV report file." + ), + ) + parser.add_argument( + "--workload-config", + help=( + "A json configuration file to define workloads that were run to" + " generate the outputs that should be parsed." + ), + required=True, + ) + parser.add_argument( + "--project-number", + help=( + "project-number (e.g. 93817472919) is needed to fetch the cpu/memory" + " utilization data from GCP." + ), + required=True, + ) + args = parser.parse_args() + + try: + os.makedirs(_LOCAL_LOGS_LOCATION) + except FileExistsError: + pass + + dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads( + args.workload_config + ) + downloadDlioOutputs(dlioWorkloads) + """ "{num_files_train}-{mean_file_size}-{batch_size}": "mean_file_size": str @@ -77,6 +111,7 @@ "batch_size": str "records": "local-ssd": [record1, record2, record3, record4] + "gcsfuse-generic": [record1, record2, record3, record4] "gcsfuse-file-cache": [record1, record2, record3, record4] "gcsfuse-no-file-cache": [record1, record2, record3, record4] """ @@ -85,15 +120,31 @@ if not mash_installed: print("Mash is not installed, will skip parsing CPU and memory usage.") - for root, _, files in os.walk(LOCAL_LOGS_LOCATION): + for root, _, files in os.walk(_LOCAL_LOGS_LOCATION): if files: + print(f"Parsing directory {root} ...") per_epoch_stats_file = root + "/per_epoch_stats.json" summary_file = root + "/summary.json" + gcsfuse_mount_options = "" + gcsfuse_mount_options_file = root + "/gcsfuse_mount_options" + if os.path.isfile(gcsfuse_mount_options_file): + with open(gcsfuse_mount_options_file) as f: + gcsfuse_mount_options = f.read().strip() + with open(per_epoch_stats_file, "r") as f: - per_epoch_stats_data = json.load(f) + try: + per_epoch_stats_data = json.load(f) + except: + print(f"failed to json-parse {per_epoch_stats_file}") + continue + with open(summary_file, "r") as f: - summary_data = json.load(f) + try: + summary_data = json.load(f) + except: + print(f"failed to json-parse {summary_file}") + continue for i in range(summary_data["epochs"]): test_name = summary_data["hostname"] @@ -107,6 +158,7 @@ "batch_size": part_list[4], "records": { "local-ssd": [], + "gcsfuse-generic": [], "gcsfuse-file-cache": [], "gcsfuse-no-file-cache": [], }, @@ -134,11 +186,20 @@ r["end"] = standard_timestamp(per_epoch_stats_data[str(i + 1)]["end"]) if r["scenario"] != "local-ssd" and mash_installed: r["lowest_memory"], r["highest_memory"] = get_memory( - r["pod_name"], r["start"], r["end"] + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, ) r["lowest_cpu"], r["highest_cpu"] = get_cpu( - r["pod_name"], r["start"], r["end"] + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, ) + pass + + r["gcsfuse_mount_options"] = gcsfuse_mount_options pprint.pprint(r) @@ -147,15 +208,12 @@ output[key]["records"][r["scenario"]][i] = r - output_order = [ - "500000-102400-800", - "500000-102400-128", - "1000000-512000-800", - "1000000-512000-128", - "100000-3145728-200", - "5000-157286400-4", + scenario_order = [ + "local-ssd", + "gcsfuse-generic", + "gcsfuse-no-file-cache", + "gcsfuse-file-cache", ] - scenario_order = ["local-ssd", "gcsfuse-no-file-cache", "gcsfuse-file-cache"] output_file = open("./output.csv", "a") output_file.write( @@ -163,12 +221,10 @@ " (s),GPU Utilization (%),Throughput (sample/s),Throughput" " (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory (MB),GCSFuse" " Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse Highest CPU" - " (core),Pod,Start,End\n" + " (core),Pod,Start,End,GcsfuseMountOptions\n" ) - for key in output_order: - if key not in output: - continue + for key in output: record_set = output[key] total_size = int( int(record_set["mean_file_size"]) @@ -177,21 +233,38 @@ ) for scenario in scenario_order: - for i in range(len(record_set["records"]["local-ssd"])): - r = record_set["records"][scenario][i] - r["throughput_over_local_ssd"] = round( - r["train_throughput_mb_per_second"] - / record_set["records"]["local-ssd"][i][ - "train_throughput_mb_per_second" - ] - * 100, - 2, - ) - output_file.write( - f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario}," - ) - output_file.write( - f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']}\n" - ) + if scenario not in record_set["records"]: + print(f"{scenario} not in output so skipping") + continue + if "local-ssd" in record_set["records"] and ( + len(record_set["records"]["local-ssd"]) + == len(record_set["records"][scenario]) + ): + for i in range(len(record_set["records"]["local-ssd"])): + r = record_set["records"][scenario][i] + r["throughput_over_local_ssd"] = round( + r["train_throughput_mb_per_second"] + / record_set["records"]["local-ssd"][i][ + "train_throughput_mb_per_second" + ] + * 100, + 2, + ) + output_file.write( + f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario}," + ) + output_file.write( + f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n" + ) + else: + for i in range(len(record_set["records"][scenario])): + r = record_set["records"][scenario][i] + r["throughput_over_local_ssd"] = "NA" + output_file.write( + f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario}," + ) + output_file.write( + f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n" + ) output_file.close() diff --git a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py index 374c8ff6e1..1cfc6a64c9 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py @@ -15,13 +15,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import argparse import json, os, pprint, subprocess import sys +import fio_workload sys.path.append("../") from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed -LOCAL_LOGS_LOCATION = "../../bin/fio-logs" +_LOCAL_LOGS_LOCATION = "../../bin/fio-logs" record = { "pod_name": "", @@ -37,36 +39,31 @@ "lowest_memory": 0, "highest_cpu": 0.0, "lowest_cpu": 0.0, + "gcsfuse_mount_options": "", + "blockSize": "", + "filesPerThread": 0, + "numThreads": 0, } -if __name__ == "__main__": - logLocations = [ - ("gke-fio-64k-1m", "64K"), - ("gke-fio-128k-1m", "128K"), - ("gke-fio-1mb-1m", "1M"), - ("gke-fio-100mb-50k", "100M"), - ("gke-fio-200gb-1", "200G"), - ] - try: - os.makedirs(LOCAL_LOGS_LOCATION) - except FileExistsError: - pass - - for folder, fileSize in logLocations: +def downloadFioOutputs(fioWorkloads): + for fioWorkload in fioWorkloads: try: - os.makedirs(LOCAL_LOGS_LOCATION + "/" + fileSize) + os.makedirs(_LOCAL_LOGS_LOCATION + "/" + fioWorkload.fileSize) except FileExistsError: pass - print(f"Download FIO output from the folder {folder}...") + + print(f"Downloading FIO outputs from {fioWorkload.bucket}...") result = subprocess.run( [ - "gsutil", - "-m", + "gcloud", + "-q", # ignore prompts + "storage", "cp", "-r", - f"gs://{folder}/fio-output", - LOCAL_LOGS_LOCATION + "/" + fileSize, + "--no-user-output-enabled", # do not print names of objects being copied + f"gs://{fioWorkload.bucket}/fio-output", + _LOCAL_LOGS_LOCATION + "/" + fioWorkload.fileSize, ], capture_output=False, text=True, @@ -74,12 +71,53 @@ if result.returncode < 0: print(f"failed to fetch FIO output, error: {result.stderr}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="DLIO Unet3d test output parser", + description=( + "This program takes in a json workload configuration file and parses" + " it for valid FIO workloads and the locations of their test outputs" + " on GCS. It downloads each such output object locally to" + " {_LOCAL_LOGS_LOCATION} and parses them for FIO test runs, and then" + " dumps their output metrics into a CSV report file." + ), + ) + parser.add_argument( + "--workload-config", + help=( + "A json configuration file to define workloads that were run to" + " generate the outputs that should be parsed." + ), + required=True, + ) + parser.add_argument( + "--project-number", + help=( + "project-number (e.g. 93817472919) is needed to fetch the cpu/memory" + " utilization data from GCP." + ), + required=True, + ) + args = parser.parse_args() + + try: + os.makedirs(_LOCAL_LOGS_LOCATION) + except FileExistsError: + pass + + fioWorkloads = fio_workload.ParseTestConfigForFioWorkloads( + args.workload_config + ) + downloadFioOutputs(fioWorkloads) + """ "{read_type}-{mean_file_size}": "mean_file_size": str "read_type": str "records": "local-ssd": [record1, record2, record3, record4] + "gcsfuse-generic": [record1, record2, record3, record4] "gcsfuse-file-cache": [record1, record2, record3, record4] "gcsfuse-no-file-cache": [record1, record2, record3, record4] """ @@ -88,9 +126,20 @@ if not mash_installed: print("Mash is not installed, will skip parsing CPU and memory usage.") - for root, _, files in os.walk(LOCAL_LOGS_LOCATION): + for root, _, files in os.walk(_LOCAL_LOGS_LOCATION): for file in files: per_epoch_output = root + f"/{file}" + if not per_epoch_output.endswith(".json"): + print(f"ignoring file {per_epoch_output} as it's not a json file") + continue + + gcsfuse_mount_options = "" + gcsfuse_mount_options_file = root + "/gcsfuse_mount_options" + if os.path.isfile(gcsfuse_mount_options_file): + with open(gcsfuse_mount_options_file) as f: + gcsfuse_mount_options = f.read().strip() + + print(f"Now parsing file {per_epoch_output} ...") root_split = root.split("/") mean_file_size = root_split[-4] scenario = root_split[-2] @@ -98,7 +147,18 @@ epoch = int(file.split(".")[0][-1]) with open(per_epoch_output, "r") as f: - per_epoch_output_data = json.load(f) + try: + per_epoch_output_data = json.load(f) + except: + print(f"failed to json-parse {per_epoch_output}, so skipping it.") + continue + + if "global options" not in per_epoch_output_data: + print(f"field: 'global options' missing in {per_epoch_output}") + continue + global_options = per_epoch_output_data["global options"] + nrfiles = int(global_options["nrfiles"]) + numjobs = int(global_options["numjobs"]) key = "-".join([read_type, mean_file_size]) if key not in output: @@ -107,6 +167,7 @@ "read_type": read_type, "records": { "local-ssd": [], + "gcsfuse-generic": [], "gcsfuse-file-cache": [], "gcsfuse-no-file-cache": [], }, @@ -132,11 +193,22 @@ r["end"] = unix_to_timestamp(per_epoch_output_data["timestamp_ms"]) if r["scenario"] != "local-ssd" and mash_installed: r["lowest_memory"], r["highest_memory"] = get_memory( - r["pod_name"], r["start"], r["end"] + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, ) r["lowest_cpu"], r["highest_cpu"] = get_cpu( - r["pod_name"], r["start"], r["end"] + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, ) + pass + r["gcsfuse_mount_options"] = gcsfuse_mount_options + r["blockSize"] = bs + r["filesPerThread"] = nrfiles + r["numThreads"] = numjobs pprint.pprint(r) @@ -145,42 +217,63 @@ output[key]["records"][scenario][epoch - 1] = r - output_order = [ - "read-64K", - "read-128K", - "read-1M", - "read-100M", - "read-200G", - "randread-1M", - "randread-100M", - "randread-200G", + scenario_order = [ + "local-ssd", + "gcsfuse-generic", + "gcsfuse-no-file-cache", + "gcsfuse-file-cache", ] - scenario_order = ["local-ssd", "gcsfuse-no-file-cache", "gcsfuse-file-cache"] output_file = open("./output.csv", "a") output_file.write( - "File Size,Read Type,Scenario,Epoch,Duration (s),Throughput" - " (MB/s),IOPS,Throughput over Local SSD (%),GCSFuse Lowest Memory" - " (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse" - " Highest CPU (core),Pod,Start,End\n" + "File Size,Read Type,Scenario,Epoch,Duration" + " (s),Throughput (MB/s),IOPS,Throughput over Local SSD (%),GCSFuse Lowest" + " Memory (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU" + " (core),GCSFuse Highest CPU" + " (core),Pod,Start,End,GcsfuseMoutOptions,BlockSize,FilesPerThread,NumThreads\n" ) - for key in output_order: - if key not in output: - continue + for key in output: record_set = output[key] for scenario in scenario_order: for i in range(len(record_set["records"][scenario])): - r = record_set["records"][scenario][i] - r["throughput_over_local_ssd"] = round( - r["throughput_mb_per_second"] - / record_set["records"]["local-ssd"][i]["throughput_mb_per_second"] - * 100, - 2, - ) - output_file.write( - f"{record_set['mean_file_size']},{record_set['read_type']},{scenario},{r['epoch']},{r['duration']},{r['throughput_mb_per_second']},{r['IOPS']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']}\n" - ) - + if ("local-ssd" in record_set["records"]) and ( + len(record_set["records"]["local-ssd"]) + == len(record_set["records"][scenario]) + ): + try: + r = record_set["records"][scenario][i] + r["throughput_over_local_ssd"] = round( + r["throughput_mb_per_second"] + / record_set["records"]["local-ssd"][i][ + "throughput_mb_per_second" + ] + * 100, + 2, + ) + except: + print( + "failed to parse record-set for throughput_over_local_ssd." + f" record: {r}" + ) + continue + else: + output_file.write( + f"{record_set['mean_file_size']},{record_set['read_type']},{scenario},{r['epoch']},{r['duration']},{r['throughput_mb_per_second']},{r['IOPS']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{r['blockSize']},{r['filesPerThread']},{r['numThreads']}\n" + ) + else: + try: + r = record_set["records"][scenario][i] + r["throughput_over_local_ssd"] = "NA" + except: + print( + "failed to parse record-set for throughput_over_local_ssd." + f" record: {r}" + ) + continue + else: + output_file.write( + f"{record_set['mean_file_size']},{record_set['read_type']},{scenario},'Unknown',{r['epoch']},{r['duration']},{r['throughput_mb_per_second']},{r['IOPS']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{r['blockSize']},{r['filesPerThread']},{r['numThreads']}\n" + ) output_file.close() diff --git a/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py b/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py index fa89a8c801..766a5c8a38 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py +++ b/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py @@ -32,7 +32,9 @@ def is_mash_installed() -> bool: return False -def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]: +def get_memory( + pod_name: str, start: str, end: str, project_number: int +) -> Tuple[int, int]: # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. result = subprocess.run( [ @@ -42,7 +44,7 @@ def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]: ( "Query(Fetch(Raw('cloud.kubernetes.K8sContainer'," " 'kubernetes.io/container/memory/used_bytes'), {'project':" - " '641665282868', 'metric:memory_type': 'non-evictable'})|" + f" '{project_number}', 'metric:memory_type': 'non-evictable'}})|" " Window(Align('10m'))| GroupBy(['pod_name', 'container_name']," f" Max()), TimeInterval('{start}', '{end}'), '5s')" ), @@ -55,6 +57,8 @@ def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]: data_points_by_pod_container = result.stdout.strip().split("\n") for data_points in data_points_by_pod_container[1:]: data_points_split = data_points.split(",") + if len(data_points_split) < 6: + continue pn = data_points_split[4] container_name = data_points_split[5] if pn == pod_name and container_name == "gke-gcsfuse-sidecar": @@ -74,7 +78,9 @@ def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]: ) -def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]: +def get_cpu( + pod_name: str, start: str, end: str, project_number: int +) -> Tuple[float, float]: # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. result = subprocess.run( [ @@ -84,9 +90,9 @@ def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]: ( "Query(Fetch(Raw('cloud.kubernetes.K8sContainer'," " 'kubernetes.io/container/cpu/core_usage_time'), {'project':" - " '641665282868'})| Window(Rate('10m'))| GroupBy(['pod_name'," - f" 'container_name'], Max()), TimeInterval('{start}', '{end}')," - " '5s')" + f" '{project_number}'}})| Window(Rate('10m'))|" + " GroupBy(['pod_name', 'container_name'], Max())," + f" TimeInterval('{start}', '{end}'), '5s')" ), ], capture_output=True, @@ -97,6 +103,8 @@ def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]: data_points_by_pod_container = result.stdout.split("\n") for data_points in data_points_by_pod_container[1:]: data_points_split = data_points.split(",") + if len(data_points_split) < 6: + continue pn = data_points_split[4] container_name = data_points_split[5] if pn == pod_name and container_name == "gke-gcsfuse-sidecar":