diff --git a/app/src/components/app.js b/app/src/components/app.js index c0851e8a..1adc3f01 100644 --- a/app/src/components/app.js +++ b/app/src/components/app.js @@ -54,9 +54,9 @@ export default { sockets: { config(data) { this.data = JSON.parse(data); - this.runCounts = this.data.dataset_names.length; + this.runCounts = this.data["properties"]["runs"].length; // set the data for runtime. - for (let dataset of this.data.dataset_names) { + for (let dataset of this.data["properties"]["runs"]) { this.runtime.push({ dataset: dataset, min_inclusive_runtime: this.data.minIncTime[dataset], diff --git a/app/src/components/callflowEnsemble.js b/app/src/components/callflowEnsemble.js index 70d86bba..52644649 100644 --- a/app/src/components/callflowEnsemble.js +++ b/app/src/components/callflowEnsemble.js @@ -280,8 +280,8 @@ export default { setupStore(data) { data = JSON.parse(data); console.log("Config file: ", data); - this.$store.numOfRuns = data["datasets"].length; - this.$store.selectedDatasets = data["names"]; + this.$store.numOfRuns = data["properties"]["runs"].length; + this.$store.selectedDatasets = data["properties"]["runs"]; this.selectedCaseStudy = data["runName"]; this.datasets = this.$store.selectedDatasets; diff --git a/app/src/components/callflowSingle.js b/app/src/components/callflowSingle.js index e98d5288..398efaf7 100644 --- a/app/src/components/callflowSingle.js +++ b/app/src/components/callflowSingle.js @@ -259,8 +259,8 @@ export default { setupStore(data) { data = JSON.parse(data); console.log("Config file: ", data); - this.$store.numOfRuns = data["datasets"].length; - this.$store.selectedDatasets = data["names"]; + this.$store.numOfRuns = data["properties"]["runs"].length; + this.$store.selectedDatasets = data["properties"]["runs"]; this.datasets = this.$store.selectedDatasets; // Enable diff mode only if the number of datasets >= 2 @@ -436,6 +436,7 @@ export default { // Create a map for each dataset mapping the respective mean times. let map = {}; for (let module_name of module_list) { + console.log(module_name, this.$store.modules[this.selectedTargetDataset][module_name]) map[module_name] = this.$store.modules[this.selectedTargetDataset][module_name][this.$store.selectedMetric]["mean_time"]; } diff --git a/callflow/callflow.py b/callflow/callflow.py index 689c7e18..431863be 100644 --- a/callflow/callflow.py +++ b/callflow/callflow.py @@ -18,17 +18,15 @@ class CallFlow: - def __init__(self, config, ensemble=False): + def __init__(self, config: dict, ensemble=False): """ Entry interface to access CallFlow's functionalities. " """ # Assert if config is provided. - assert isinstance(config, callflow.operations.ConfigFileReader) - - # Convert config json to props. Never touch self.config ever. - self.props = json.loads(json.dumps(config, default=lambda o: o.__dict__)) + assert isinstance(config, dict) + self.config = config self.ensemble = ensemble # -------------------------------------------------------------------------- @@ -37,19 +35,19 @@ def _create_dot_callflow_folder(self): """ Create a .callflow directory and empty files. """ - LOGGER.debug(f"Saved .callflow directory is: {self.props['save_path']}") + LOGGER.debug(f"Saved .callflow directory is: {self.config['save_path']}") - if not os.path.exists(self.props["save_path"]): - os.makedirs(self.props["save_path"]) - os.makedirs(os.path.join(self.props["save_path"], "ensemble")) + if not os.path.exists(self.config["save_path"]): + os.makedirs(self.config["save_path"]) + os.makedirs(os.path.join(self.config["save_path"], "ensemble")) - dataset_folders = [] - for dataset in self.props["datasets"]: - dataset_folders.append(dataset["name"]) + dataset_folders = [k for k in self.config["properties"]["paths"].keys()] + # for dataset in self.config["properties"][""]: + # dataset_folders.append(self.config["properties"]["name"]) dataset_folders.append("ensemble") for dataset in dataset_folders: - dataset_dir = os.path.join(self.props["save_path"], dataset) + dataset_dir = os.path.join(self.config["save_path"], dataset) LOGGER.debug(dataset_dir) if not os.path.exists(dataset_dir): # if self.debug: @@ -72,20 +70,20 @@ def process(self): """ Process the datasets based on the format (i.e., either single or ensemble) """ - ndatasets = len(self.props["dataset_names"]) + ndatasets = len(self.config["properties"]["runs"]) assert self.ensemble == (ndatasets > 1) self._create_dot_callflow_folder() if self.ensemble: - self._process_ensemble(self.props["dataset_names"]) + self._process_ensemble(self.config["properties"]["runs"]) else: - self._process_single(self.props["dataset_names"][0]) + self._process_single(self.config["properties"]["runs"][0]) def load(self): """ Load the processed datasets by the format. """ - ndatasets = len(self.props["dataset_names"]) + ndatasets = len(self.config["properties"]["runs"]) if self.ensemble: self.supergraphs = self._read_ensemble() # assertion here is 1 less than self.supergraph.keys, becasuse @@ -95,18 +93,18 @@ def load(self): self.supergraphs = self._read_single() assert len(self.supergraphs.keys()) == 1 - # Adds basic information to props. - # Props is later return to client app on "init" request. - self.add_basic_info_to_props() + # Adds basic information to config. + # Config is later return to client app on "init" request. + self.add_basic_info_to_config() def _process_single(self, dataset): """ Single dataset processing. """ - supergraph = SuperGraph(props=self.props, tag=dataset, mode="process") - LOGGER.info("#########################################") - LOGGER.info(f"Run: {dataset}") - LOGGER.info("#########################################") + LOGGER.debug("#########################################") + LOGGER.debug(f"Single Mode: {dataset}") + LOGGER.debug("#########################################") + supergraph = SuperGraph(config=self.config, tag=dataset, mode="process") # Process each graphframe. supergraph.process_gf() @@ -132,12 +130,12 @@ def _process_ensemble(self, datasets): single_supergraphs = {} for idx, dataset_name in enumerate(datasets): # Create an instance of dataset. + LOGGER.debug("#########################################") + LOGGER.debug(f"Ensemble Mode: {dataset_name}") + LOGGER.debug("#########################################") single_supergraphs[dataset_name] = SuperGraph( - props=self.props, tag=dataset_name, mode="process" + config=self.config, tag=dataset_name, mode="process" ) - LOGGER.info("#########################################") - LOGGER.info(f"Run: {dataset_name}") - LOGGER.info("#########################################") # Process each graphframe. single_supergraphs[dataset_name].process_gf() @@ -156,7 +154,7 @@ def _process_ensemble(self, datasets): # Create a supergraph class for ensemble case. ensemble_supergraph = EnsembleGraph( - self.props, "ensemble", mode="process", supergraphs=single_supergraphs + self.config, "ensemble", mode="process", supergraphs=single_supergraphs ) # Write the graphframe to file. @@ -178,7 +176,7 @@ def _process_ensemble(self, datasets): ensemble_supergraph.ensemble_auxiliary( # MPIBinCount=self.currentMPIBinCount, # RunBinCount=self.currentRunBinCount, - datasets=self.props["dataset_names"], + datasets=self.config["properties"]["runs"], MPIBinCount=20, RunBinCount=20, process=True, @@ -191,9 +189,9 @@ def _read_single(self): """ supergraphs = {} # Only consider the first dataset from the listing. - dataset_name = self.props["dataset_names"][0] + dataset_name = self.config["properties"]["runs"][0] supergraphs[dataset_name] = SuperGraph( - props=self.props, tag=dataset_name, mode="render" + config=self.config, tag=dataset_name, mode="render" ) return supergraphs @@ -204,16 +202,16 @@ def _read_ensemble(self): """ supergraphs = {} - for idx, dataset_name in enumerate(self.props["dataset_names"]): + for idx, dataset_name in enumerate(self.config["properties"]["runs"]): supergraphs[dataset_name] = SuperGraph( - self.props, dataset_name, mode="render" + config=self.config, tag=dataset_name, mode="render" ) - # supergraphs[dataset_name].read_gf(read_parameter=self.props["read_parameter"]) + # supergraphs[dataset_name].read_gf(read_parameter=self.config["read_parameter"]) supergraphs["ensemble"] = EnsembleGraph( - props=self.props, tag="ensemble", mode="render" + config=self.config, tag="ensemble", mode="render" ) - # supergraphs["ensemble"].read_gf(read_parameter=self.props["read_parameter"]) + # supergraphs["ensemble"].read_gf(read_parameter=self.config["read_parameter"]) # supergraphs["ensemble"].read_auxiliary_data() return supergraphs @@ -221,42 +219,42 @@ def _read_ensemble(self): # Reading and rendering methods. # All the functions below are Public methods that are accessed by the server. - def add_basic_info_to_props(self): + def add_basic_info_to_config(self): """ - Adds basic information (like max, min inclusive and exclusive runtime) to self.props. + Adds basic information (like max, min inclusive and exclusive runtime) to self.config. """ - self.props["maxIncTime"] = {} - self.props["maxExcTime"] = {} - self.props["minIncTime"] = {} - self.props["minExcTime"] = {} - self.props["numOfRanks"] = {} + self.config["maxIncTime"] = {} + self.config["maxExcTime"] = {} + self.config["minIncTime"] = {} + self.config["minExcTime"] = {} + self.config["numOfRanks"] = {} maxIncTime = 0 maxExcTime = 0 minIncTime = 0 minExcTime = 0 for idx, tag in enumerate(self.supergraphs): - self.props["maxIncTime"][tag] = ( + self.config["maxIncTime"][tag] = ( self.supergraphs[tag].gf.df["time (inc)"].max() ) - self.props["maxExcTime"][tag] = self.supergraphs[tag].gf.df["time"].max() - self.props["minIncTime"][tag] = ( + self.config["maxExcTime"][tag] = self.supergraphs[tag].gf.df["time"].max() + self.config["minIncTime"][tag] = ( self.supergraphs[tag].gf.df["time (inc)"].min() ) - self.props["minExcTime"][tag] = self.supergraphs[tag].gf.df["time"].min() - # self.props["numOfRanks"][dataset] = len( + self.config["minExcTime"][tag] = self.supergraphs[tag].gf.df["time"].min() + # self.config["numOfRanks"][dataset] = len( # self.datasets[dataset].gf.df["rank"].unique() # ) - maxExcTime = max(self.props["maxExcTime"][tag], maxExcTime) - maxIncTime = max(self.props["maxIncTime"][tag], maxIncTime) - minExcTime = min(self.props["minExcTime"][tag], minExcTime) - minIncTime = min(self.props["minIncTime"][tag], minIncTime) - # maxNumOfRanks = max(self.props["numOfRanks"][dataset], maxNumOfRanks) - - self.props["maxIncTime"]["ensemble"] = maxIncTime - self.props["maxExcTime"]["ensemble"] = maxExcTime - self.props["minIncTime"]["ensemble"] = minIncTime - self.props["minExcTime"]["ensemble"] = minExcTime - # self.props["numOfRanks"]["ensemble"] = maxNumOfRanks + maxExcTime = max(self.config["maxExcTime"][tag], maxExcTime) + maxIncTime = max(self.config["maxIncTime"][tag], maxIncTime) + minExcTime = min(self.config["minExcTime"][tag], minExcTime) + minIncTime = min(self.config["minIncTime"][tag], minIncTime) + # maxNumOfRanks = max(self.config["numOfRanks"][dataset], maxNumOfRanks) + + self.config["maxIncTime"]["ensemble"] = maxIncTime + self.config["maxExcTime"]["ensemble"] = maxExcTime + self.config["minIncTime"]["ensemble"] = minIncTime + self.config["minExcTime"]["ensemble"] = minExcTime + # self.config["numOfRanks"]["ensemble"] = maxNumOfRanks def request_single(self, operation): """ @@ -278,7 +276,7 @@ def request_single(self, operation): operation_name = operation["name"] if operation_name == "init": - return self.props + return self.config elif operation_name == "auxiliary": return self.supergraphs[operation["dataset"]].auxiliary_data @@ -312,10 +310,10 @@ def request_ensemble(self, operation): Handles all the socket requests connected to Single CallFlow. """ operation_name = operation["name"] - datasets = self.props["dataset_names"] + datasets = self.config["properties"]["runs"] if operation_name == "init": - return self.props + return self.config elif operation_name == "ensemble_cct": result = NodeLinkLayout( diff --git a/callflow/datastructures/ensemblegraph.py b/callflow/datastructures/ensemblegraph.py index 182f51a1..b790c9ec 100644 --- a/callflow/datastructures/ensemblegraph.py +++ b/callflow/datastructures/ensemblegraph.py @@ -24,14 +24,14 @@ class EnsembleGraph(SuperGraph): """ # -------------------------------------------------------------------------- - def __init__(self, props={}, tag="", mode="process", supergraphs={}): + def __init__(self, config={}, tag="", mode="process", supergraphs={}): """ Arguments: supergraphs (dict): dictionary of supergraphs keyed by their tag. """ self.supergraphs = supergraphs - super().__init__(props, tag, mode) + super().__init__(config, tag, mode) # -------------------------------------------------------------------------- def create_gf(self): diff --git a/callflow/datastructures/graphframe.py b/callflow/datastructures/graphframe.py index a1e1087d..0c894024 100644 --- a/callflow/datastructures/graphframe.py +++ b/callflow/datastructures/graphframe.py @@ -116,11 +116,14 @@ def from_config(config, name): LOGGER.info(f"Creating graphframes: {name}") LOGGER.info(f"Data path: {config['data_path']}") - data_path = os.path.join(config["data_path"], config["paths"][name]) - if config["format"][name] == "hpctoolkit": + data_path = os.path.join( + config["data_path"], config["properties"]["paths"][name] + ) + profile_format = config["properties"]["format"][name] + if profile_format == "hpctoolkit": gf = ht.GraphFrame.from_hpctoolkit(data_path) - elif config["format"][name] == "caliper": + elif profile_format == "caliper": grouping_attribute = "function" default_metric = "sum(sum#time.duration), inclusive_sum(sum#time.duration)" query = "select function,%s group by %s format json-split" % ( @@ -129,16 +132,16 @@ def from_config(config, name): ) gf = ht.GraphFrame.from_caliper(data_path, query=query) - elif config["format"][name] == "caliper_json": - gf = ht.GraphFrame.from_caliper(data_path, query="") + elif profile_format == "caliper-json": + gf = ht.GraphFrame.from_caliper_json(data_path) - elif config["format"][name] == "gprof": + elif profile_format == "gprof": gf = ht.GraphFrame.from_gprof_dot(data_path) - elif config["format"][name] == "literal": + elif profile_format == "literal": gf = ht.GraphFrame.from_literal(config["data_path"]) - elif config["format"][name] == "lists": + elif profile_format == "lists": gf = ht.GraphFrame.from_lists(config["data_path"]) return GraphFrame.from_hatchet(gf) @@ -155,8 +158,8 @@ def hatchet_graph_to_nxg(ht_graph): def _get_node_name(nd): nm = callflow.utils.sanitize_name(nd["name"]) - if nd["line"] != "NA": - nm += ":" + str(nd["line"]) + if nd.get("line") != "NA" and nd.get("line") is not None: + nm += ":" + str(nd.get("line")) return nm # `node_dict_from_frame` converts the hatchet's frame to a dictionary diff --git a/callflow/datastructures/supergraph.py b/callflow/datastructures/supergraph.py index ceb851ba..2a652f26 100644 --- a/callflow/datastructures/supergraph.py +++ b/callflow/datastructures/supergraph.py @@ -25,7 +25,7 @@ class SuperGraph(object): _FILENAMES = {"params": "env_params.txt", "aux": "auxiliary_data.json"} # -------------------------------------------------------------------------- - def __init__(self, props={}, tag="", mode="process"): + def __init__(self, config={}, tag="", mode="process"): """ Arguments: props (dict): dictionary to store the configuration. CallFlow appends more information while processing. @@ -35,7 +35,8 @@ def __init__(self, props={}, tag="", mode="process"): assert mode in ["process", "render"] self.timer = Timer() - self.props = props + self.props = config + print(self.props["save_path"], tag) self.dirname = os.path.join(self.props["save_path"], tag) self.tag = tag self.mode = mode @@ -92,7 +93,8 @@ def process_gf(self): Note: Process class follows a builder pattern. (refer: https://en.wikipedia.org/wiki/Builder_pattern#:~:text=The%20builder%20pattern%20is%20a,Gang%20of%20Four%20design%20patterns.) """ - if self.props["format"][self.tag] == "hpctoolkit": + profile_format = self.props["properties"]["format"][self.tag] + if profile_format == "hpctoolkit": process = ( Process.Builder(self.gf, self.tag) @@ -106,26 +108,36 @@ def process_gf(self): .build() ) - elif ( - self.props["format"][self.tag] == "caliper_json" - or self.props["format"][self.tag] == "caliper" - ): - - process = ( - Process.Builder(self.gf, self.tag) - .add_time_columns() - .add_rank_column() - .add_callers_and_callees() - .add_dataset_name() - .add_imbalance_perc() - .add_module_name_caliper(self.props["callsite_module_map"]) - .create_name_module_map() - .add_vis_node_name() - .add_path() - .build() - ) + elif profile_format == "caliper-json" or profile_format == "caliper": + if "callsite_module_map" in self.props: + process = ( + Process.Builder(self.gf, self.tag) + .add_time_columns() + .add_rank_column() + .add_callers_and_callees() + .add_dataset_name() + .add_imbalance_perc() + .add_module_name_caliper(self.props["callsite_module_map"]) + .create_name_module_map() + .add_vis_node_name() + .add_path() + .build() + ) + else: + process = ( + Process.Builder(self.gf, self.tag) + .add_time_columns() + .add_rank_column() + .add_callers_and_callees() + .add_dataset_name() + .add_imbalance_perc() + .create_name_module_map() + .add_vis_node_name() + .add_path() + .build() + ) - elif self.props["format"][self.tag] == "gprof": + elif profile_format == "gprof": process = ( Process.Builder(self.gf, self.tag) .add_nid_column() diff --git a/callflow/modules/auxiliary.py b/callflow/modules/auxiliary.py index e6dc72bc..5add71fd 100644 --- a/callflow/modules/auxiliary.py +++ b/callflow/modules/auxiliary.py @@ -38,7 +38,7 @@ def __init__( self.RunBinCount = RunBinCount self.timer = Timer() self.props = props - self.datasets = self.props["dataset_names"] + self.datasets = self.props["properties"]["runs"] self.df = self.select_rows(self.gf.df, self.datasets) self.process = process self.hist_props = ["rank", "name", "dataset", "all_ranks"] diff --git a/callflow/operations/__init__.py b/callflow/operations/__init__.py index a4faee18..cd46aa2c 100644 --- a/callflow/operations/__init__.py +++ b/callflow/operations/__init__.py @@ -7,5 +7,6 @@ from .group import Group from .filter import Filter from .read_config import ConfigFileReader +from .argparser import ArgParser -__all__ = ["Process", "Group", "Filter", "ConfigFileReader"] +__all__ = ["Process", "Group", "Filter", "ConfigFileReader", "ArgParser"] diff --git a/callflow/operations/argparser.py b/callflow/operations/argparser.py new file mode 100644 index 00000000..a5323351 --- /dev/null +++ b/callflow/operations/argparser.py @@ -0,0 +1,386 @@ +# Copyright 2017-2020 Lawrence Livermore National Security, LLC and other +# CallFlow Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +import os +import jsonschema +import argparse + +import callflow + +LOGGER = callflow.get_logger(__name__) + +schema = { + "type": "object", + "properties": { + "data_path": {"type": "string"}, + "experiment": {"type": "string"}, + "save_path": {"type": "string"}, + "read_parameter": {"type": "boolean"}, + "properties": {"type": "object"}, + }, +} + + +class ArgParser: + """ + Read the config file. + Config file contains the information to process datasets accrodingly. + """ + + def __init__(self): + + _READ_MODES = { + "config": ArgParser._read_config, + "directory": ArgParser._read_directory, + "gfs": ArgParser._read_gfs, + } + + # Parse the arguments passed. + args = self._create_parser() + + # Verify if only valid things are passed. + # Read mode determines how arguments will be consumed by CallFlow. + read_mode = self._verify_parser(args) + LOGGER.debug(f"Read mode: {read_mode}") + + # Check if read mode is one of the keys of _READ_MODES. + assert read_mode in _READ_MODES.keys() + + self.arguments = _READ_MODES[read_mode](args) + + # Add read_mode to arguments. + self.arguments["read_mode"] = read_mode + + # Add process to arguments + self.arguments["process"] = args.process + + # validate the json. + jsonschema.validate(instance=self.arguments, schema=schema) + + LOGGER.debug(f"Arguments: {self.arguments}") + + def get_arguments(self): + return self.arguments + + # Private methods. + @staticmethod + def _create_parser(): + """ + Parse the input arguments. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--verbose", action="store_true", help="Display debug points" + ) + parser.add_argument("--config", help="Config file to be processed.") + parser.add_argument("--data_dir", help="Input directory to be processed.") + parser.add_argument( + "--process", + action="store_true", + help="Process mode. To preprocess at the required level of granularity, use the options --filter, --entire.", + ) + parser.add_argument( + "--profile_format", + help="Profile format, either hpctoolkit | caliper | caliper_json", + ) + parser.add_argument("--production", help="Launch app on production server.") + parser.add_argument("--filter_perc", help="Set filter percentage") + parser.add_argument( + "--filter_by", help="Set filter by (e.g., time or time (inc)" + ) + parser.add_argument( + "--group_by", + help="Set group by (e.g., grouping by 'name' column gets call graph, and grouping by 'module' produces a super graph", + ) + parser.add_argument("--save_path", help="Save path for the processed files") + parser.add_argument( + "--read_parameter", help="Enable parameter analysis", action="store_true" + ) + + args = parser.parse_args() + return args + + @staticmethod + def _verify_parser(args: argparse.Namespace): + """ + Verify the input arguments. + + Raises expections if something is not provided + Check if the config file is provided and exists! + + Parameters + ---------- + args : argparse.Namespace + Arguments passed by the user. + + Returns + ------- + process_mode: 'config' or 'directory' or 'gfs' + Process mode with which CallFlow will process the data. + """ + if not args.config and not args.data_dir and args.gfs: + s = "Please provide a config file (or) directory (or) pass in the graphframes. To see options, use --help" + raise Exception(s) + + if args.config: + read_mode = "config" + if not os.path.isfile(args.config): + s = "Config file ({}) not found!".format(args.config) + raise Exception(s) + + elif args.data_dir: + read_mode = "directory" + + elif args.gfs: + read_mode = "graphframes" + + return read_mode + + @staticmethod + def _read_config(args: argparse.Namespace): + scheme = {} + f = open(args.config, "r").read() + json = callflow.utils.jsonify_string(f) + + # Validate the input json. + jsonschema.validate(instance=json, schema=schema) + + # Set the data_path, which is data directory. + scheme["data_path"] = os.path.dirname(args.config) + + # Set the run_name. + if "experiment" not in json: + scheme["experiment"] = scheme["data_path"].split("/")[-1] + else: + scheme["experiment"] = json["experiment"] + + # Set the save_path, if provided, else it will be ${data_path}/.callflow + if "save_path" not in json: + scheme["save_path"] = os.path.abspath( + os.path.join(scheme["data_path"], ".callflow") + ) + else: + scheme["save_path"] = os.path.abspath(json["save_path"]) + + # Set the read_parameter if provided + if "read_parameter" not in json: + scheme["read_parameter"] = False + else: + scheme["read_parameter"] = json["read_parameter"] + + # Set the datasets key, according to the format. + _SCHEME_PROFILE_FORMAT_MAPPER = { + "hpctoolkit": ArgParser._scheme_dataset_map_hpctoolkit, + "caliper": ArgParser._scheme_dataset_map_caliper, + "caliper_json": ArgParser._scheme_dataset_map_caliper_json, + "default": ArgParser._scheme_dataset_map_default, + } + scheme["properties"] = [] + if "runs" not in json: + assert "profile_format" in json or "profile_format" in args + + if "profile_format" in json: + profile_format = json["profile_format"] + + if "profile_format" in args: + profile_format = args.profile_format + + LOGGER.debug(f"Scheme: {profile_format}") + + scheme["properties"] = _SCHEME_PROFILE_FORMAT_MAPPER[profile_format]( + scheme["data_path"] + ) + else: + LOGGER.debug("Scheme: default") + scheme["properties"] = _SCHEME_PROFILE_FORMAT_MAPPER["default"]( + json["properties"] + ) + + if "module_map" in json["scheme"]: + scheme["module_callsite_map"] = json["scheme"]["module_map"] + + scheme["callsite_module_map"] = ArgParser._process_module_map( + scheme["module_callsite_map"] + ) + scheme["filter_perc"] = json["scheme"]["filter_perc"] + scheme["filter_by"] = json["scheme"]["filter_by"] + scheme["group_by"] = json["scheme"]["group_by"] + + return scheme + + @staticmethod + def _scheme_dataset_map_default(run_props: dict): + """ + Derive the scheme for dataset_map, if dataset_map is provided through the config file. + """ + scheme = {} + scheme["runs"] = [] + scheme["paths"] = {} + scheme["profile_format"] = {} + # Parse the information for each dataset + for idx, data in enumerate(run_props): + name = data["name"] + scheme["runs"].append(name) + scheme["paths"][name] = data["path"] + scheme["profile_format"][name] = data["format"] + return scheme + + @staticmethod + def _scheme_dataset_map_hpctoolkit(data_path: str): + """ + Derive the scheme for dataset_map for hpctoolkit format. + """ + scheme = {} + scheme["runs"] = [] + scheme["paths"] = {} + scheme["profile_format"] = {} + list_subfolders_with_paths = [ + f.path for f in os.scandir(data_path) if f.is_dir() + ] + + for idx, subfolder_path in enumerate(list_subfolders_with_paths): + name = subfolder_path.split("/")[-1] + if name != ".callflow": + scheme["runs"].append(name) + scheme["paths"][name] = subfolder_path + scheme["profile_format"][name] = "hpctoolkit" + + return scheme + + @staticmethod + def _scheme_dataset_map_caliper(data_path: str): + """ + Derive the scheme for dataset_map for caliper format. + """ + scheme = {} + scheme["runs"] = [] + scheme["paths"] = {} + scheme["profile_format"] = {} + list_cali_paths = [ + f.path + for f in os.scandir(data_path) + if os.path.splitext(f.path)[1] == ".cali" + ] + + for idx, subfolder_path in enumerate(list_cali_paths): + name = subfolder_path.split("/")[-1].split(".")[0] + if name != ".callflow": + scheme["runs"].append(name) + scheme["paths"][name] = subfolder_path + scheme["profile_format"][name] = "caliper" + + return scheme + + @staticmethod + def _scheme_dataset_map_caliper_json(data_path: str): + """ + Derive the scheme for dataset_map for caliper json format. + """ + scheme = {} + scheme["runs"] = [] + scheme["paths"] = {} + scheme["format"] = {} + list_json_paths = [ + f.path + for f in os.scandir(data_path) + if os.path.splitext(f.path)[1] == ".json" + ] + + for idx, subfolder_path in enumerate(list_json_paths): + name = subfolder_path.split("/")[-1] + if name != "callflow.config.json": + filename = name.split(".")[0] + scheme["runs"].append(filename) + scheme["paths"][filename] = subfolder_path + scheme["format"][filename] = "caliper-json" + + return scheme + + @staticmethod + def _read_directory(args): + scheme = {} + + # Set data path + scheme["data_path"] = args.data_dir + + # Set experiement + scheme["experiment"] = scheme["data_path"].split("/")[-1] + + # Set save_path + if args.save_path: + scheme["save_path"] = args.save_path + else: + scheme["save_path"] = os.path.abspath( + os.path.join(scheme["data_path"], ".callflow") + ) + + scheme["read_parameter"] = args.read_parameter + + # Set the datasets key, according to the format. + _SCHEME_PROFILE_FORMAT_MAPPER = { + "hpctoolkit": ArgParser._scheme_dataset_map_hpctoolkit, + "caliper": ArgParser._scheme_dataset_map_caliper, + "caliper_json": ArgParser._scheme_dataset_map_caliper_json, + "default": ArgParser._scheme_dataset_map_default, + } + scheme["properties"] = [] + if "profile_format" in args: + profile_format = args.profile_format + + LOGGER.debug(f"Scheme: {profile_format}") + + scheme["properties"] = _SCHEME_PROFILE_FORMAT_MAPPER[profile_format]( + scheme["data_path"] + ) + else: + scheme["properties"] = _SCHEME_PROFILE_FORMAT_MAPPER["default"]( + scheme["data_path"] + ) + + # Set filter_perc + if args.filter_perc: + scheme["filter_perc"] = args.filter_perc + else: + scheme["filter_perc"] = 0 + + # Set filter_by + if args.filter_by: + scheme["filter_by"] = args.filter_by + else: + scheme["filter_by"] = "time (inc)" + + # Set group_by + if args.group_by: + scheme["group_by"] = args.group_by + else: + scheme["group_by"] = "module" + + return scheme + + @staticmethod + def _read_gfs(self, args): + pass + + @staticmethod + def _process_module_map(module_callsite_map): + """ + Process module mapper file. + """ + ret = {} + for module in module_callsite_map: + callsites = module_callsite_map[module] + for callsite in callsites: + ret[callsite] = module + return ret + + # -------------------------------------------------------------------------- + def __str__(self): + items = ("%s = %r" % (k, v) for k, v in self.__dict__.items()) + return "<%s: {%s}> \n" % (self.__class__.__name__, ", ".join(items)) + + def __repr__(self): + return self.__str__() + + # -------------------------------------------------------------------------- diff --git a/callflow/operations/process.py b/callflow/operations/process.py index f969dc56..8cb6d375 100644 --- a/callflow/operations/process.py +++ b/callflow/operations/process.py @@ -188,11 +188,6 @@ def add_module_name_hpctoolkit(self): ) return self - # def add_node_name_caliper(self, node_module_map): - # self.gf.df["node_name"] = self.gf.df["name"].apply( - # lambda name: name_module_map[name] - # ) - def add_module_name_caliper(self, module_map): self.gf.df["module"] = self.gf.df["name"].apply( lambda name: module_map[name] @@ -233,9 +228,12 @@ def add_time_columns(self): return self def create_name_module_map(self): + if "module" not in self.gf.df.columns: + self.gf.df["module"] = self.gf.df["name"] self.name_module_map = ( self.gf.df.groupby(["name"])["module"].unique().to_dict() ) + return self def raiseExceptionIfNodeCountNotEqual(self, attr): @@ -248,9 +246,3 @@ def raiseExceptionIfNodeCountNotEqual(self, attr): raise Exception( f"Unmatched Preprocessing maps: Map contains: {map_node_count} nodes, graph contains: {df_node_count} nodes" ) - - def logInformation(self): - LOGGER.info(f"CCT node count : {len(self.cct_nodes)}") - LOGGER.info(f"CallGraph node count: {len(self.callgraph_nodes)}") - LOGGER.info(f"SuperGraph node count: {len(self.gf.df['module'].unique())}") - return self diff --git a/callflow/utils.py b/callflow/utils.py index 2671ca71..bd105f63 100644 --- a/callflow/utils.py +++ b/callflow/utils.py @@ -3,8 +3,18 @@ # # SPDX-License-Identifier: MIT -# a similar function in utils/hatchet.py -def sanitize_name(name): +# ------------------------------------------------------------------------------ +# Utility functions used by callflow. +# ------------------------------------------------------------------------------ +import callflow +import hatchet +import json + + +def sanitize_name(name: str): + """ + Sanitize the callsites for general dataset. + """ ret_name = "" if name is None: ret_name = "Unknown" @@ -17,7 +27,10 @@ def sanitize_name(name): return ret_name -def sanitizeAMMName(name): +def sanitizeAMMName(name: str): + """ + Sanitize the callsites for AMM dataset. + """ if "::" in name: name = name.split("::")[-1] else: @@ -25,17 +38,14 @@ def sanitizeAMMName(name): return name -# ------------------------------------------------------------------------------ -def visModuleCallsiteName(name, df): - return df.groupby(["name"]).unique()["module"] - - -def convertStringToList(string): - res = string.strip("][").split(", ") - return res +def convertStringToList(string: str): + """ + Convert a string which is an array to an array + """ + return string.strip("][").split(", ") -def median(arr): +def median(arr: list): """ Returns the median and its index in the array. """ @@ -54,7 +64,7 @@ def median(arr): return median, indices -def avg(arr): +def avg(arr: list): """ Returns the average of the array. Uses floating-point division. @@ -70,12 +80,44 @@ def string_to_list(string: str, sep: str): return string.strip("][").split(sep) -# ------------------------------------------------------------------------------ -# networkx utilities -# ------------------------------------------------------------------------------ -# not sure if this is used anywhere -# Also, why is this not consistent with the rest of the style (ie, actions) -def dfs(graph, dataframe, limit): +def jsonify_string(string: str): + """ + Convert a string input to a json object + + """ + assert isinstance(string, str) + _ = json.loads(string, object_hook=byteify) + return byteify(_, ignore_dicts=True) + + +def byteify(data, ignore_dicts=False): + + # if this is a unicode string, return its string representation + if isinstance(data, bytes): + return data.encode("utf-8") + + # if this is a list of values, return list of byteified values + if isinstance(data, list): + return [byteify(item, ignore_dicts=True) for item in data] + + # if this is a dictionary, return dictionary of byteified keys and values + # but only if we haven't already byteified it + if isinstance(data, dict) and not ignore_dicts: + return { + byteify(key, ignore_dicts=True): byteify(value, ignore_dicts=True) + for key, value in data.items() + } + # if it's anything else, return it in its original form + return data + + +def dfs(gf: callflow.GraphFrame, limit: int): + """ + Depth first search for debugging purposes. + """ + dataframe = gf.dataframe + graph = gf.graph + def _dfs_recurse(root, level): for node in root.children: result = "" @@ -88,7 +130,6 @@ def _dfs_recurse(root, level): ] inclusive_runtime = " time (inc) = " + str(node_df["time (inc)"].mean()) exclusive_runtime = " time = " + str(node_df["time"].mean()) - # module = "Module = " + str(node_df['module'].unique()[0]) module = "" result += ( "Node = " @@ -110,8 +151,11 @@ def _dfs_recurse(root, level): _dfs_recurse(root, level) -# ------------------------------------------------------------------------------ -def bfs_hatchet(graph): +def bfs(gf): + """ + Breadth first search for debugging purposes. + """ + graph = gf.graph ret = {} node_count = 0 roots = graph.roots @@ -132,7 +176,10 @@ def bfs_hatchet(graph): return ret -def getNodeCallpath(node): +def get_node_callpath(node: hatchet.node): + """ + Return the call path for a given callflow.GraphFrame.graph.Node + """ ret = [] list_of_frames = list(node.path()) for frame in list_of_frames: @@ -144,11 +191,17 @@ def getNodeCallpath(node): return ret -def getNodeParents(node): +def get_node_parents(node: hatchet.node): + """ + Return parents of a hatchet.node + """ return node.parents -def get_callsite_name_from_frame(node): +def get_callsite_name_from_frame(node: hatchet.node): + """ + Return callsite name for hatchet.node + """ name = node.frame.get("name") if name is not None: return node.frame.get("name") @@ -156,21 +209,28 @@ def get_callsite_name_from_frame(node): return node.frame.get("file") -def node_dict_from_frame(frame): +def node_dict_from_frame(frame: hatchet.frame): """ Constructs callsite's name from Hatchet's frame. """ + if frame.get("type") is None and frame["name"] is not None: + return {"name": frame.get("name"), "type": "function"} + if frame["type"] == "function": - return {"name": frame["name"], "line": "NA", "type": "function"} + return {"name": frame.get("name"), "line": "NA", "type": "function"} elif frame["type"] == "statement": - return {"name": frame["file"], "line": frame["line"], "type": "statement"} + return { + "name": frame.get("file"), + "line": frame.get("line"), + "type": "statement", + } elif frame["type"] == "loop": - return {"name": frame["file"], "line": frame["line"], "type": "loop"} + return {"name": frame.get("file"), "line": frame.get("line"), "type": "loop"} else: return {} -def path_list_from_frames(frames): +def path_list_from_frames(frames: list): """ Constructs callsite's path from Hatchet's frame. """ @@ -178,11 +238,13 @@ def path_list_from_frames(frames): for frame in frames: path = [] for f in frame: - if f["type"] == "function": - path.append(f["name"]) - elif f["type"] == "statement": - path.append(f["file"] + ":" + str(f["line"])) - elif f["type"] == "loop": - path.append(f["file"] + ":" + str(f["line"])) + if f.get("type") == "function": + path.append(f.get("name")) + elif f.get("type") == "statement": + path.append(f.get("file") + ":" + str(f.get("line"))) + elif f.get("type") == "loop": + path.append(f.get("file") + ":" + str(f.get("line"))) + else: + path.append(f.get("name")) paths.append(path) return path diff --git a/data/caliper-cali/callflow.config.json b/data/caliper-cali/callflow.config.json index f9e93285..02b818ce 100644 --- a/data/caliper-cali/callflow.config.json +++ b/data/caliper-cali/callflow.config.json @@ -1,8 +1,8 @@ { - "run_name": "caliper-cali", + "experiment": "caliper-cali", "save_path": "./data/caliper-cali/.callflow", "read_parameter": false, - "datasets": [ + "runs": [ { "name": "caliper-ex", "path": "caliper-ex.json", diff --git a/data/caliper-lulesh-json/callflow.config.json b/data/caliper-lulesh-json/callflow.config.json index e841b811..b313c89b 100644 --- a/data/caliper-lulesh-json/callflow.config.json +++ b/data/caliper-lulesh-json/callflow.config.json @@ -1,8 +1,8 @@ { - "run_name": "caliper-lulesh-json", + "experiment": "caliper-lulesh-json", "save_path": "data/caliper-lulesh-json/.callflow", "read_parameter": false, - "datasets": [ + "runs": [ { "name": "lulesh", "path": "lulesh-sample-annotation-profile.json", diff --git a/data/hpctoolkit-cpi-database/callflow.config.json b/data/hpctoolkit-cpi-database/callflow.config.json index b0caa6f6..51bf5ed0 100644 --- a/data/hpctoolkit-cpi-database/callflow.config.json +++ b/data/hpctoolkit-cpi-database/callflow.config.json @@ -1,8 +1,8 @@ { - "run_name": "hpctoolkit-cpi-database", + "experiment": "hpctoolkit-cpi-database", "save_path": "./data/hpctoolkit-cpi-database/.callflow", "read_parameter": false, - "datasets": [ + "runs": [ { "name": "calc-pi", "path": "", diff --git a/docs/examples/callflow.config.json b/docs/examples/callflow.config.json new file mode 100644 index 00000000..0c2c4333 --- /dev/null +++ b/docs/examples/callflow.config.json @@ -0,0 +1,36 @@ +{ + "experiment": "experiment_name", + "save_path": "/path/to/dir", + "read_parameter": false, + "runs": [{ + "name": "run-1", + "path": "/path/to/run-1", + "profile_format": "hpctoolkit | caliper | caliper_json" + }, + { + "name": "run-2", + "path": "/path/to/run-2", + "profile_format": "hpctoolkit | caliper | caliper_json" + }, + { + "name": "run-3", + "path": "/path/to/run-3", + "profile_format": "hpctoolkit | caliper | caliper_json" + } + ], + "schema": { + "filter_by": "time (inc)", + "filter_perc": 0, + "group_by": "name", + "module_map": { + "Lulesh": ["main", "lulesh.cycle"], + "LeapFrog": ["LagrangeNodal", "LagrangeLeapFrog"], + "CalcForce": ["CalcForceForNodes", "CalcVolumeForceForElems", "CalcHourglassControlForElems", "CalcFBHourglassForceForElems"], + "CalcLagrange": ["LagrangeElements", "UpdateVolumesForElems", "CalcLagrangeElements", "CalcKinematicsForElems", "CalcQForElems", "CalcMonotonicQGradientsForElems", "CalcMonotonicQRegionForElems", "ApplyMaterialPropertiesForElems", "EvalEOSForElems", "CalcEnergyForElems", "CalcPressureForElems", "CalcSoundSpeedForElems", "IntegrateStressForElems"], + "Timer": ["TimeIncrement"], + "CalcConstraint": ["CalcTimeConstraintsForElems", "CalcCourantConstraintForElems", "CalcHydroConstraintForElems"], + "NA": ["Unknown"], + "MPI": ["MPI_Barrier", "MPI_Reduce", "MPI_Allreduce", "MPI_Irecv", "MPI_Isend", "MPI_Wait", "MPI_Waitall", "MPI_Finalize"] + } + } +} \ No newline at end of file diff --git a/docs/user_guide.rst b/docs/user_guide.rst index 27240a4b..50a54d2d 100644 --- a/docs/user_guide.rst +++ b/docs/user_guide.rst @@ -14,29 +14,95 @@ CallFlow is structured as three components: 3. A python server to support the visualization client -Sample Data ------------ +Sample Datasets +--------------- Sample data and examples are provided in the `data `_ and `examples `_. + +Arguments +--------- + +.. code-block:: console + + --verbose - Display debug points. + (optional, default: false) + + --config - Config file to be processed. + (Either config file or data directory must be provided) + + --data_dir - Input directory to be processed. + (Either config file or data directory must be provided) + + --process - Enable process mode. + (default: false) + + --profile_format - Profile format. + (required, either hpctoolkit | caliper | caliper_json) + + --save_path - Save path for the processed files. + (optional, default: data_dir/.callflow) + + --filter_by - Set filter by column + (optional, e.g., "time" or "time (inc)") + + --filter_perc - Set filter percentage. + (optional, e.g., 10, 20, 30) + + --group_by - Set the semantic level for supergraph + (optional, e.g., module to get super graph, name to get call graph, default: 'module') + + --read_parameter - Enable parameter analysis. + (optional. This is an experimental feature) + +Process datasets +---------------- +First step is to process the raw datasets to use with CallFlow. The processing can be done either by passing data directory (using --data_dir), or using `config.callflow.json` file (using --config). + +1. Using the directory (i.e., --data_dir). + +The user can input a directory of profiles of a "same" format for processing. + +.. code-block:: console + + $ python3 server/main.py --data_dir /path/to/dataset --profile_format hpctoolkit --process + +Note: The processing step typically entails some filtering and aggregation of data to produce the reduced graphs at desired granularity. To do this, the user can currently provide 3 arguments, namely --filter_by, --filter_perc, and --group_by. + +2. Using the config file (i.e., --config). + +The user can process profiles from different formats using the config file. The parameters of the preprocessing are provided through a config file (see examples of config files in the sample data directories). For example, the user can pass other arguments (e.g., save_path, filter_perc, etc.). + +.. literalinclude:: examples/callflow.config.json + :language: json + :linenos: + :emphasize-lines: 14 + :caption: Sample `callflow.config.json` to process 3 datasets + +.. code-block:: console + + $ python3 server/main.py --config /path/to/config.callflow.json --process + + Using CallFlow as a web app --------------------------- +To run CallFlow's web app, a python-based WSGI server (handles socket communication and processing of data) and a Vue client server need to be run simultaneously. -The first step is to process the raw datasets to use with CallFlow. This preprocessing typically entails some filtering and aggregation of data to produce the reduced graphs at desired granularity. The parameters of the preprocessing are provided through a config file (see examples of config files in the sample data directories). +1. Run the WSGI server. -To process the datasets, +Note: Similar to the processing step, the web server can be run either using --config or --data_dir. .. code-block:: console - $ python3 server/main.py --config {config_file_path} --process + $ python3 server/main.py --data_dir /path/to/dataset -Next, the server can be run using the following command, +or .. code-block:: console - $ python3 server/main.py --config {config_file_path} + $ python3 server/main.py --config /path/to/config.callflow.json -To start the app, +2. Run the client server. .. code-block:: console diff --git a/requirements.txt b/requirements.txt index df8cd030..b956a2e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,6 @@ colorlog statsmodels matplotlib sklearn +jsonschema +eventlet hatchet diff --git a/server/main.py b/server/main.py index f2053ec6..eae90d3e 100644 --- a/server/main.py +++ b/server/main.py @@ -5,15 +5,13 @@ # Library imports from flask import Flask from flask_socketio import SocketIO, emit -import os import json -import argparse from networkx.readwrite import json_graph # ------------------------------------------------------------------------------ # CallFlow imports. import callflow -from callflow.operations import ConfigFileReader +from callflow.operations import ArgParser LOGGER = callflow.get_logger(__name__) @@ -29,22 +27,15 @@ class CallFlowServer: """ def __init__(self): - # Parse the arguments passed. - args = self._create_parser() + self.args = ArgParser().get_arguments() - # Verify if only valid things are passed. - self._verify_parser(args) + self.debug = True + self.production = False + self.process = self.args["process"] - self.debug = args.verbose or True - self.production = args.production or False - self.process = args.process - - # Read the config file using config file reader. - self.config = ConfigFileReader(args.config) - - ndatasets = len(self.config.datasets) + ndatasets = len(self.args["properties"]["runs"]) assert ndatasets > 0 - self.callflow = callflow.CallFlow(config=self.config, ensemble=ndatasets > 1) + self.callflow = callflow.CallFlow(config=self.args, ensemble=ndatasets > 1) if self.process: self.callflow.process() @@ -56,40 +47,6 @@ def __init__(self): self._create_server() # ------------------------------------------------------------------------------ - # Private methods. - @staticmethod - def _create_parser(): - """ - Parse the input arguments - """ - parser = argparse.ArgumentParser() - parser.add_argument( - "--verbose", action="store_true", help="Display debug points" - ) - parser.add_argument("--config", help="Config file to be processed.") - parser.add_argument("--production", help="Launch app on production server.") - - parser.add_argument( - "--process", - action="store_true", - help="Process mode. To preprocess at the required level of granularity, use the options --filter, --entire. If you are preprocessing multiple callgraphs, use --ensemble option.", - ) - args = parser.parse_args() - return args - - @staticmethod - def _verify_parser(args): - """ - Raises expections if something is not provided - Check if the config file is provided and exists! - """ - if not args.config: - s = "Please provide a config file. To see options, use --help" - raise Exception(s) - - if not os.path.isfile(args.config): - s = "Config file ({}) not found!".format(args.config) - raise Exception(s) def _create_server(self): """ @@ -102,7 +59,7 @@ def _create_server(self): # Socket request handlers self._request_handler_general() - if len(self.config.datasets) == 1: + if len(self.args["properties"]["runs"]) == 1: self._request_handler_single() else: self._request_handler_single()