From 5b1014e938ff72e4645b4eddd75656e4724ead6a Mon Sep 17 00:00:00 2001 From: Michelle Gower Date: Fri, 4 Oct 2024 14:10:17 -0500 Subject: [PATCH] Add QGraph dependencies to dimension clustering. --- doc/changes/DM-46513.misc.rst | 1 + doc/lsst.ctrl.bps/quickstart.rst | 37 +++ .../lsst/ctrl/bps/quantum_clustering_funcs.py | 286 ++++++++++++++---- tests/cqg_test_utils.py | 24 +- tests/qg_test_utils.py | 96 +++--- tests/test_quantum_clustering_funcs.py | 256 +++++++++++++++- 6 files changed, 580 insertions(+), 120 deletions(-) diff --git a/doc/changes/DM-46513.misc.rst b/doc/changes/DM-46513.misc.rst index d6d30779..339bb81a 100644 --- a/doc/changes/DM-46513.misc.rst +++ b/doc/changes/DM-46513.misc.rst @@ -1,2 +1,3 @@ Fixed test_clustered_quantum_graph.testClusters. Moved validation of ClusteredQuantumGraph from tests to class definition. +Added following QuantumGraph dependencies to dimension clustering to enable clustering when dimension values aren't equal (e.g., group vs visit). diff --git a/doc/lsst.ctrl.bps/quickstart.rst b/doc/lsst.ctrl.bps/quickstart.rst index 1623f43d..4fbddef4 100644 --- a/doc/lsst.ctrl.bps/quickstart.rst +++ b/doc/lsst.ctrl.bps/quickstart.rst @@ -1099,6 +1099,43 @@ Relevant Config Entries: # requestCpus: N # Overrides for jobs in this cluster # requestMemory: NNNN # MB, Overrides for jobs in this cluster +.. _bps-dimension_dependency: + +User-defined Dimension Clustering with QuantumGraph Dependencies +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +There are instances where the dimensions aren't the same for quanta +that we want to put in the same cluster. In many cases, we can use +``equalDimensions`` to solve this problem. However that only works if +the values are equal, but just different dimension names (e.g., visit +and exposure). In the case of group and visit, the values aren't the +same. The QuantumGraph has dependencies between those quanta that +can be used instead of comparing dimension values. + +Using the dependencies is an option per cluster definition. To enable it, +define ``find_dependency_method``. A subgraph of the pipeline graph is +made (i.e., a directed graph of the pipeline task labels specified for +the cluster). A value of ``sink`` says to use the dimensions of the sink +nodes in the subgraph and then find ancestors in the ``QuantumGraph`` to +complete the cluster. A value of ``source`` says to use the dimensions +of the source nodes in the subgraph and then find descendants in the +``QuantumGraph`` to complete the cluster. Generally, it doesn't matter +which direction is used, but the direction determines which dimension +values appear in the cluster names and thus job names. + +.. code-block:: YAML + + clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.dimension_clustering + cluster: + # Repeat cluster subsection for however many clusters there are + # with or without find_dependency_method + clusterLabel1: + dimensions: visit, detector + pipetasks: getRegionTimeFromVisit, loadDiaCatalogs, diaPipe + find_dependency_method: sink + # requestCpus: N # Overrides for jobs in this cluster + # requestMemory: NNNN # MB, Overrides for jobs in this cluster + .. _bps-softlink: WMS-id softlink diff --git a/python/lsst/ctrl/bps/quantum_clustering_funcs.py b/python/lsst/ctrl/bps/quantum_clustering_funcs.py index 6ad1c44e..26cb301d 100644 --- a/python/lsst/ctrl/bps/quantum_clustering_funcs.py +++ b/python/lsst/ctrl/bps/quantum_clustering_funcs.py @@ -25,15 +25,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Functions that convert QuantumGraph into ClusteredQuantumGraph. -""" +"""Functions that convert QuantumGraph into ClusteredQuantumGraph.""" import logging import re from collections import defaultdict from typing import Any from uuid import UUID -from lsst.pipe.base import QuantumGraph +from lsst.pipe.base import QuantumGraph, QuantumNode from networkx import DiGraph, is_directed_acyclic_graph, topological_sort from . import BpsConfig, ClusteredQuantumGraph, QuantaCluster @@ -119,8 +118,8 @@ def _check_clusters_tasks( cluster_labels: `list` [`str`] Dependency ordered list of cluster labels (includes single quantum clusters). - ordered_tasks : `dict` [`str`, `list` [`str`]] - Mapping of cluster label to ordered list of task labels. + ordered_tasks : `dict` [`str`, `networkx.DiGraph`] + Mapping of cluster label to task subgraph. Raises ------ @@ -162,7 +161,7 @@ def _check_clusters_tasks( if cluster_tasks_in_qgraph: # Ensure have list of tasks in dependency order. quantum_subgraph = label_graph.subgraph(cluster_tasks_in_qgraph) - ordered_tasks[cluster_label] = list(topological_sort(quantum_subgraph)) + ordered_tasks[cluster_label] = quantum_subgraph clustered_task_graph.add_node(cluster_label) @@ -171,7 +170,8 @@ def _check_clusters_tasks( if label not in used_labels: task_to_cluster[label] = label clustered_task_graph.add_node(label) - ordered_tasks[label] = [label] + # ordered_tasks[label] = [label] + ordered_tasks[label] = label_graph.subgraph([label]) # Create dependencies between clusters. for edge in task_graph.edges: @@ -219,14 +219,24 @@ def dimension_clustering(config: BpsConfig, qgraph: QuantumGraph, name: str) -> for cluster_label in cluster_labels: _LOG.debug("cluster = %s", cluster_label) if cluster_label in cluster_section: - add_dim_clusters( - cluster_section[cluster_label], - cluster_label, - qgraph, - ordered_tasks, - cqgraph, - quantum_to_cluster, - ) + if "find_dependency_method" in cluster_section[cluster_label]: + add_dim_clusters_dependency( + cluster_section[cluster_label], + cluster_label, + qgraph, + ordered_tasks, + cqgraph, + quantum_to_cluster, + ) + else: + add_dim_clusters( + cluster_section[cluster_label], + cluster_label, + qgraph, + ordered_tasks, + cqgraph, + quantum_to_cluster, + ) else: add_clusters_per_quantum(config, cluster_label, qgraph, cqgraph, quantum_to_cluster) @@ -283,7 +293,7 @@ def add_dim_clusters( cluster_config: BpsConfig, cluster_label: str, qgraph: QuantumGraph, - ordered_tasks: dict[str, list[str]], + ordered_tasks: dict[str, DiGraph], cqgraph: ClusteredQuantumGraph, quantum_to_cluster: dict[UUID, str], ) -> None: @@ -297,8 +307,8 @@ def add_dim_clusters( Cluster label for which to add clusters. qgraph : `lsst.pipe.base.QuantumGraph` QuantumGraph providing quanta for the clusters. - ordered_tasks : `dict` [`str`, `list` [`str`]] - Mapping of cluster label to ordered list of task labels. + ordered_tasks : `dict` [`str`, `networkx.DiGraph`] + Mapping of cluster label to task label subgraph. cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` The ClusteredQuantumGraph to which the new 1-quantum clusters are added (modified in method). @@ -321,51 +331,17 @@ def add_dim_clusters( _LOG.debug("template = %s", template) new_clusters = [] - for task_label in ordered_tasks[cluster_label]: + for task_label in topological_sort(ordered_tasks[cluster_label]): # Determine cluster for each node - for uuid, quantum in qgraph.get_task_quanta(task_label).items(): - # Gather info for cluster name template into a dictionary. - info: dict[str, Any] = {"node_number": uuid} - - missing_info = set() - assert quantum.dataId is not None, "Quantum DataId cannot be None" # for mypy - data_id_info = dict(quantum.dataId.mapping) - for dim_name in cluster_dims: - _LOG.debug("dim_name = %s", dim_name) - if dim_name in data_id_info: - info[dim_name] = data_id_info[dim_name] - else: - missing_info.add(dim_name) - if equal_dims: - for pair in [pt.strip() for pt in equal_dims.split(",")]: - dim1, dim2 = pair.strip().split(":") - if dim1 in cluster_dims and dim2 in data_id_info: - info[dim1] = data_id_info[dim2] - missing_info.remove(dim1) - elif dim2 in cluster_dims and dim1 in data_id_info: - info[dim2] = data_id_info[dim1] - missing_info.remove(dim2) - - info["label"] = cluster_label - _LOG.debug("info for template = %s", info) - - if missing_info: - raise RuntimeError( - f"Quantum {uuid} ({data_id_info}) missing dimensions: {','.join(missing_info)}; " - f"required for cluster {cluster_label}" - ) - - # Use dictionary plus template format string to create name. - # To avoid # key errors from generic patterns, use defaultdict. - cluster_name = template.format_map(defaultdict(lambda: "", info)) - cluster_name = re.sub("_+", "_", cluster_name) - - # Some dimensions contain slash which must be replaced. - cluster_name = re.sub("/", "_", cluster_name) - _LOG.debug("cluster_name = %s", cluster_name) + task_def = qgraph.findTaskDefByLabel(task_label) + assert task_def is not None # for mypy + for node in qgraph.getNodesForTask(task_def): + cluster_name, info = get_cluster_name_from_node( + node, cluster_dims, "cluster1", template, equal_dims + ) # Save mapping for use when creating dependencies. - quantum_to_cluster[uuid] = cluster_name + quantum_to_cluster[node.nodeId] = cluster_name # Add cluster to the ClusteredQuantumGraph. # Saving NodeId instead of number because QuantumGraph API @@ -375,7 +351,7 @@ def add_dim_clusters( else: cluster = QuantaCluster(cluster_name, cluster_label, info) cqgraph.add_cluster(cluster) - cluster.add_quantum(uuid, task_label) + cluster.add_quantum(node.nodeId, task_label) new_clusters.append(cluster) for cluster in new_clusters: @@ -421,3 +397,191 @@ def add_cluster_dependencies( qnode.quantum.dataId, ) raise + + +def add_dim_clusters_dependency( + cluster_config: BpsConfig, + cluster_label: str, + qgraph: QuantumGraph, + ordered_tasks: dict[str, DiGraph], + cqgraph: ClusteredQuantumGraph, + quantum_to_cluster: dict[UUID, str], +) -> None: + """Add clusters for a cluster label to a ClusteredQuantumGraph using + QuantumGraph dependencies as well as dimension values to help when + some do not have particular dimension value. + + Parameters + ---------- + cluster_config : `lsst.ctrl.bps.BpsConfig` + BPS configuration for specific cluster label. + cluster_label : `str` + Cluster label for which to add clusters. + qgraph : `lsst.pipe.base.QuantumGraph` + QuantumGraph providing quanta for the clusters. + ordered_tasks : `dict` [`str`, `networkx.DiGraph`] + Mapping of cluster label to task label subgraph. + cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` + The ClusteredQuantumGraph to which the new + clusters are added (modified in method). + quantum_to_cluster : `dict` [ `str`, `str` ] + Mapping of quantum node id to which cluster it was added + (modified in method). + """ + cluster_dims = [] + if "dimensions" in cluster_config: + cluster_dims = [d.strip() for d in cluster_config["dimensions"].split(",")] + _LOG.debug("cluster_dims = %s", cluster_dims) + equal_dims = cluster_config.get("equalDimensions", None) + + found, template = cluster_config.search("clusterTemplate", opt={"replaceVars": False}) + if not found: + if cluster_dims: + template = f"{cluster_label}_" + "_".join(f"{{{dim}}}" for dim in cluster_dims) + else: + template = cluster_label + _LOG.debug("template = %s", template) + + method = cluster_config["find_dependency_method"] + match method: + case "source": + dim_labels = [ + node for node, in_degree in ordered_tasks[cluster_label].in_degree() if in_degree == 0 + ] + case "sink": + dim_labels = [ + node for node, out_degree in ordered_tasks[cluster_label].out_degree() if out_degree == 0 + ] + case _: + raise RuntimeError("Invalid find_dependency_of (method)") + + new_clusters = [] + for task_label in dim_labels: + task_def = qgraph.findTaskDefByLabel(task_label) + assert task_def is not None # for mypy + for node in qgraph.getNodesForTask(task_def): + cluster_name, info = get_cluster_name_from_node( + node, cluster_dims, cluster_label, template, equal_dims + ) + + # Add cluster to the ClusteredQuantumGraph. + # Saving NodeId instead of number because QuantumGraph API + # requires it for creating per-job QuantumGraphs. + if cluster_name in cqgraph: + cluster = cqgraph.get_cluster(cluster_name) + else: + cluster = QuantaCluster(cluster_name, cluster_label, info) + cqgraph.add_cluster(cluster) + cluster.add_quantum(node.nodeId, task_label) + + # Save mapping for use when creating dependencies. + quantum_to_cluster[node.nodeId] = cluster_name + + # Use dependencies to find other quantum to add + # Note: in testing, using the following code was faster than + # using networkx descendants and ancestors functions + nodes_to_use = [node] + nodes_already_visited: dict[UUID, bool] = {} + while nodes_to_use: + node_to_use = nodes_to_use.pop() + match method: + case "source": + possible_nodes = qgraph.determineOutputsOfQuantumNode(node_to_use) + case "sink": + possible_nodes = qgraph.determineInputsToQuantumNode(node_to_use) + for possible_node in possible_nodes: + if possible_node.taskDef.label in ordered_tasks[cluster_label]: + if possible_node.nodeId not in nodes_already_visited: + _LOG.debug( + "Adding possible quantum %s (%s) to cluster %s", + possible_node.nodeId, + possible_node.taskDef.label, + cluster_name, + ) + cluster.add_quantum(possible_node.nodeId, possible_node.taskDef.label) + quantum_to_cluster[possible_node.nodeId] = cluster_name + nodes_already_visited[possible_node.nodeId] = True + nodes_to_use.append(possible_node) + else: + _LOG.debug( + "label (%s) not in ordered_tasks. Not adding possible quantum %s", + possible_node.taskDef.label, + possible_node.nodeId, + ) + + new_clusters.append(cluster) + + for cluster in new_clusters: + add_cluster_dependencies(cqgraph, cluster, quantum_to_cluster) + + +def get_cluster_name_from_node( + node: QuantumNode, + cluster_dims: list[str], + cluster_label: str, + template: str, + equal_dims: str, +) -> tuple[str, dict[str, Any]]: + """Get the cluster name in which to add the given node. + + Parameters + ---------- + node : `lsst.pipe.base.QuantumNode` + QuantumNode from which to create the cluster. + cluster_dims : `list` [ `str` ] + List of dimension names to be used when clustering. + cluster_label : `str` + Cluster label. + template : `str` + Template for the cluster name. + equal_dims : `str` + Configuration describing any dimensions to be considered equal. + + Returns + ------- + cluster_name : `str` + Name of the cluster in which to add the given node. + info : dict [`str`, `str`] + Information needed if creating a new node. + """ + # Gather info for cluster name template into a dictionary. + info: dict[str, Any] = {"node_number": node.nodeId} + + missing_info = set() + assert node.quantum.dataId is not None # for mypy + data_id_info = dict(node.quantum.dataId.mapping) + for dim_name in cluster_dims: + _LOG.debug("dim_name = %s", dim_name) + if dim_name in data_id_info: + info[dim_name] = data_id_info[dim_name] + else: + missing_info.add(dim_name) + if equal_dims: + for pair in [pt.strip() for pt in equal_dims.split(",")]: + dim1, dim2 = pair.strip().split(":") + if dim1 in cluster_dims and dim2 in data_id_info: + info[dim1] = data_id_info[dim2] + missing_info.remove(dim1) + elif dim2 in cluster_dims and dim1 in data_id_info: + info[dim2] = data_id_info[dim1] + missing_info.remove(dim2) + + info["label"] = cluster_label + _LOG.debug("info for template = %s", info) + + if missing_info: + raise RuntimeError( + f"Quantum {node.nodeId} ({data_id_info}) missing dimensions: {','.join(missing_info)}; " + f"required for cluster {cluster_label}" + ) + + # Use dictionary plus template format string to create name. + # To avoid # key errors from generic patterns, use defaultdict. + cluster_name = template.format_map(defaultdict(lambda: "", info)) + cluster_name = re.sub("_+", "_", cluster_name) + + # Some dimensions contain slash which must be replaced. + cluster_name = re.sub("/", "_", cluster_name) + _LOG.debug("cluster_name = %s", cluster_name) + + return cluster_name, info diff --git a/tests/cqg_test_utils.py b/tests/cqg_test_utils.py index 066cd137..3e8faea2 100644 --- a/tests/cqg_test_utils.py +++ b/tests/cqg_test_utils.py @@ -152,11 +152,11 @@ def compare_cqg_dicts(truth, cqg): ), f"Mismatch edges: truth={truth['edges']}, cqg={cqg['edges']}" -# T1(1,2) T1(3,4) T4(1,2) T4(3,4) -# | | -# T2(1,2) T2(3,4) -# | | -# T3(1,2) T3(3,4) +# T1(1,2) T1(1,4) T1(3,4) T4(1,2) T4(3,4) +# | | | +# T2(1,2) T2(1,4) T2(3,4) +# | | | +# T3(1,2) T3(1,4) T3(3,4) def make_test_clustered_quantum_graph(outdir): """Make a ClusteredQuantumGraph for testing. @@ -189,6 +189,8 @@ def make_test_clustered_quantum_graph(outdir): # Add orphans cluster = QuantaCluster.from_quantum_node(test_lookup["T4_1_2"], "T4_1_2") cqg.add_cluster(cluster) + cluster = QuantaCluster.from_quantum_node(test_lookup["T4_1_4"], "T4_1_4") + cqg.add_cluster(cluster) cluster = QuantaCluster.from_quantum_node(test_lookup["T4_3_4"], "T4_3_4") cqg.add_cluster(cluster) @@ -196,7 +198,15 @@ def make_test_clustered_quantum_graph(outdir): qc1 = QuantaCluster.from_quantum_node(test_lookup["T1_1_2"], "T1_1_2") qc2 = QuantaCluster.from_quantum_node(test_lookup["T2_1_2"], "T23_1_2") qc2.add_quantum_node(test_lookup["T3_1_2"]) - qc2.label = "clusterT2T3" # update label so doesnt look like only T2 + qc2.label = "clusterT2T3" # update label so doesnt look like only T2 + cqg.add_cluster([qc2, qc1]) # reversed to check order is corrected in tests + cqg.add_dependency(qc1, qc2) + + # T1,T2,T3 Dim1 = 1, Dim2 = 4 + qc1 = QuantaCluster.from_quantum_node(test_lookup["T1_1_4"], "T1_1_4") + qc2 = QuantaCluster.from_quantum_node(test_lookup["T2_1_4"], "T23_1_4") + qc2.add_quantum_node(test_lookup["T3_1_4"]) + qc2.label = "clusterT2T3" # update label so doesnt look like only T2 cqg.add_cluster([qc2, qc1]) # reversed to check order is corrected in tests cqg.add_dependency(qc1, qc2) @@ -204,7 +214,7 @@ def make_test_clustered_quantum_graph(outdir): qc1 = QuantaCluster.from_quantum_node(test_lookup["T1_3_4"], "T1_3_4") qc2 = QuantaCluster.from_quantum_node(test_lookup["T2_3_4"], "T23_3_4") qc2.add_quantum_node(test_lookup["T3_3_4"]) - qc2.label = "clusterT2T3" # update label so doesnt look like only T2 + qc2.label = "clusterT2T3" # update label so doesnt look like only T2 cqg.add_cluster([qc2, qc1]) # reversed to check order is corrected in tests cqg.add_dependency(qc1, qc2) diff --git a/tests/qg_test_utils.py b/tests/qg_test_utils.py index 1b3ea3eb..71207426 100644 --- a/tests/qg_test_utils.py +++ b/tests/qg_test_utils.py @@ -128,6 +128,48 @@ class Dummy4PipelineTask(PipelineTask): ConfigClass = Dummy4Config +def _make_quantum(run, universe, task, task_def, dim1, dim2, intermediate_refs): + if task_def.connections.initInputs: + init_init_ds_type = DatasetType( + task_def.connections.initInput.name, + (), + storageClass=task_def.connections.initInput.storageClass, + universe=universe, + ) + init_refs = [DatasetRef(init_init_ds_type, DataCoordinate.make_empty(universe), run=run)] + else: + init_refs = None + input_ds_type = DatasetType( + task_def.connections.input.name, + task_def.connections.input.dimensions, + storageClass=task_def.connections.input.storageClass, + universe=universe, + ) + data_id = DataCoordinate.standardize({"D1": dim1, "D2": dim2}, universe=universe) + if ref := intermediate_refs.get((input_ds_type, data_id)): + input_refs = [ref] + else: + input_refs = [DatasetRef(input_ds_type, data_id, run=run)] + output_ds_type = DatasetType( + task_def.connections.output.name, + task_def.connections.output.dimensions, + storageClass=task_def.connections.output.storageClass, + universe=universe, + ) + ref = DatasetRef(output_ds_type, data_id, run=run) + intermediate_refs[(output_ds_type, data_id)] = ref + output_refs = [ref] + quantum = Quantum( + taskName=task.__qualname__, + dataId=data_id, + taskClass=task, + initInputs=init_refs, + inputs={input_ds_type: input_refs}, + outputs={output_ds_type: output_refs}, + ) + return quantum + + def make_test_quantum_graph(run: str = "run"): """Create a QuantumGraph for unit tests. @@ -146,11 +188,11 @@ def make_test_quantum_graph(run: str = "run"): .. code-block:: - T1(1,2) T1(3,4) T4(1,2) T4(3,4) - | | - T2(1,2) T2(3,4) - | | - T3(1,2) T3(3,4) + T1(1,2) T1(1,4) T1(3,4) T4(1,2) T4(3,4) + | | | + T2(1,2) T2(1,4) T2(3,4) + | | | + T3(1,2) T3(1,4) T3(3,4) """ config = Config( { @@ -205,47 +247,9 @@ def make_test_quantum_graph(run: str = "run"): task_def = TaskDef(get_full_type_name(task), task.ConfigClass(), task, label) tasks.append(task_def) quantum_set = set() - for dim1, dim2 in ((1, 2), (3, 4)): - if task_def.connections.initInputs: - init_init_ds_type = DatasetType( - task_def.connections.initInput.name, - (), - storageClass=task_def.connections.initInput.storageClass, - universe=universe, - ) - init_refs = [DatasetRef(init_init_ds_type, DataCoordinate.make_empty(universe), run=run)] - else: - init_refs = None - input_ds_type = DatasetType( - task_def.connections.input.name, - task_def.connections.input.dimensions, - storageClass=task_def.connections.input.storageClass, - universe=universe, - ) - data_id = DataCoordinate.standardize({"D1": dim1, "D2": dim2}, universe=universe) - if ref := intermediate_refs.get((input_ds_type, data_id)): - input_refs = [ref] - else: - input_refs = [DatasetRef(input_ds_type, data_id, run=run)] - output_ds_type = DatasetType( - task_def.connections.output.name, - task_def.connections.output.dimensions, - storageClass=task_def.connections.output.storageClass, - universe=universe, - ) - ref = DatasetRef(output_ds_type, data_id, run=run) - intermediate_refs[(output_ds_type, data_id)] = ref - output_refs = [ref] - quantum_set.add( - Quantum( - taskName=task.__qualname__, - dataId=data_id, - taskClass=task, - initInputs=init_refs, - inputs={input_ds_type: input_refs}, - outputs={output_ds_type: output_refs}, - ) - ) + for dim1, dim2 in ((1, 2), (1, 4), (3, 4)): + quantum = _make_quantum(run, universe, task, task_def, dim1, dim2, intermediate_refs) + quantum_set.add(quantum) quantum_map[task_def] = quantum_set qgraph = QuantumGraph(quantum_map, metadata=METADATA) diff --git a/tests/test_quantum_clustering_funcs.py b/tests/test_quantum_clustering_funcs.py index 5c15b272..378041cf 100644 --- a/tests/test_quantum_clustering_funcs.py +++ b/tests/test_quantum_clustering_funcs.py @@ -105,6 +105,11 @@ def testClusterAllInOne(self): "dims": {"D1": 1, "D2": 2}, "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, }, + "cl1_1_4": { + "label": "cl1", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, + }, "cl1_3_4": { "label": "cl1", "dims": {"D1": 3, "D2": 4}, @@ -141,6 +146,11 @@ def testClusterTemplate(self): "dims": {"D1": 1, "D2": 2}, "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, }, + "ct_1_4_": { + "label": "cl1", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, + }, "ct_3_4_": { "label": "cl1", "dims": {"D1": 3, "D2": 4}, @@ -172,8 +182,8 @@ def testClusterNoDims(self): answer = { "name": name, "nodes": { - "cl1": {"label": "cl1", "dims": {}, "counts": {"T1": 2, "T2": 2}}, - "cl2": {"label": "cl2", "dims": {}, "counts": {"T3": 2, "T4": 2}}, + "cl1": {"label": "cl1", "dims": {}, "counts": {"T1": 3, "T2": 3}}, + "cl2": {"label": "cl2", "dims": {}, "counts": {"T3": 3, "T4": 3}}, }, "edges": [("cl1", "cl2")], } @@ -236,6 +246,11 @@ def testClusterEqualDim1(self): "dims": {"D1": 1, "NotThere": 2}, "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, }, + "cl1_1_4": { + "label": "cl1", + "dims": {"D1": 1, "NotThere": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, + }, "cl1_3_4": { "label": "cl1", "dims": {"D1": 3, "NotThere": 4}, @@ -271,6 +286,11 @@ def testClusterEqualDim2(self): "dims": {"D1": 1, "NotThere": 2}, "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, }, + "cl1_1_4": { + "label": "cl1", + "dims": {"D1": 1, "NotThere": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, + }, "cl1_3_4": { "label": "cl1", "dims": {"D1": 3, "NotThere": 4}, @@ -299,11 +319,13 @@ def testClusterMult(self): "name": name, "nodes": { "cl1_1_2": {"label": "cl1", "dims": {"D1": 1, "D2": 2}, "counts": {"T1": 1, "T2": 1}}, + "cl1_1_4": {"label": "cl1", "dims": {"D1": 1, "D2": 4}, "counts": {"T1": 1, "T2": 1}}, "cl1_3_4": {"label": "cl1", "dims": {"D1": 3, "D2": 4}, "counts": {"T1": 1, "T2": 1}}, "cl2_1_2": {"label": "cl2", "dims": {"D1": 1, "D2": 2}, "counts": {"T3": 1, "T4": 1}}, + "cl2_1_4": {"label": "cl2", "dims": {"D1": 1, "D2": 4}, "counts": {"T3": 1, "T4": 1}}, "cl2_3_4": {"label": "cl2", "dims": {"D1": 3, "D2": 4}, "counts": {"T3": 1, "T4": 1}}, }, - "edges": [("cl1_3_4", "cl2_3_4"), ("cl1_1_2", "cl2_1_2")], + "edges": [("cl1_3_4", "cl2_3_4"), ("cl1_1_2", "cl2_1_2"), ("cl1_1_4", "cl2_1_4")], } cqg = dimension_clustering(config, self.qgraph, name) @@ -324,13 +346,20 @@ def testClusterPart(self): "name": name, "nodes": { "cl1_1_2": {"label": "cl1", "dims": {"D1": 1, "D2": 2}, "counts": {"T1": 1, "T2": 1}}, + "cl1_1_4": {"label": "cl1", "dims": {"D1": 1, "D2": 4}, "counts": {"T1": 1, "T2": 1}}, "cl1_3_4": {"label": "cl1", "dims": {"D1": 3, "D2": 4}, "counts": {"T1": 1, "T2": 1}}, "NODENAME_T3_1_2_": {"label": "T3", "dims": {"D1": 1, "D2": 2}, "counts": {"T3": 1}}, + "NODENAME_T3_1_4_": {"label": "T3", "dims": {"D1": 1, "D2": 4}, "counts": {"T3": 1}}, "NODENAME_T3_3_4_": {"label": "T3", "dims": {"D1": 3, "D2": 4}, "counts": {"T3": 1}}, "NODENAME_T4_1_2_": {"label": "T4", "dims": {"D1": 1, "D2": 2}, "counts": {"T4": 1}}, + "NODENAME_T4_1_4_": {"label": "T4", "dims": {"D1": 1, "D2": 4}, "counts": {"T4": 1}}, "NODENAME_T4_3_4_": {"label": "T4", "dims": {"D1": 3, "D2": 4}, "counts": {"T4": 1}}, }, - "edges": [("cl1_1_2", "NODENAME_T3_1_2_"), ("cl1_3_4", "NODENAME_T3_3_4_")], + "edges": [ + ("cl1_1_2", "NODENAME_T3_1_2_"), + ("cl1_3_4", "NODENAME_T3_3_4_"), + ("cl1_1_4", "NODENAME_T3_1_4_"), + ], } cqg = dimension_clustering(config, self.qgraph, name) @@ -349,12 +378,17 @@ def testClusterPartNoTemplate(self): answer = { "name": name, "nodes": { - "cl1": {"label": "cl1", "dims": {}, "counts": {"T1": 2, "T2": 2}}, + "cl1": {"label": "cl1", "dims": {}, "counts": {"T1": 3, "T2": 3}}, "NODEONLY_T3_{'D1': 1, 'D2': 2}": { "label": "T3", "dims": {"D1": 1, "D2": 2}, "counts": {"T3": 1}, }, + "NODEONLY_T3_{'D1': 1, 'D2': 4}": { + "label": "T3", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T3": 1}, + }, "NODEONLY_T3_{'D1': 3, 'D2': 4}": { "label": "T3", "dims": {"D1": 3, "D2": 4}, @@ -365,13 +399,22 @@ def testClusterPartNoTemplate(self): "dims": {"D1": 1, "D2": 2}, "counts": {"T4": 1}, }, + "NODEONLY_T4_{'D1': 1, 'D2': 4}": { + "label": "T4", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T4": 1}, + }, "NODEONLY_T4_{'D1': 3, 'D2': 4}": { "label": "T4", "dims": {"D1": 3, "D2": 4}, "counts": {"T4": 1}, }, }, - "edges": [("cl1", "NODEONLY_T3_{'D1': 1, 'D2': 2}"), ("cl1", "NODEONLY_T3_{'D1': 3, 'D2': 4}")], + "edges": [ + ("cl1", "NODEONLY_T3_{'D1': 1, 'D2': 2}"), + ("cl1", "NODEONLY_T3_{'D1': 1, 'D2': 4}"), + ("cl1", "NODEONLY_T3_{'D1': 3, 'D2': 4}"), + ], } cqg = dimension_clustering(config, self.qgraph, name) @@ -398,6 +441,11 @@ def testClusterExtra(self): "dims": {"D1": 1, "D2": 2}, "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, }, + "cl1_1_4": { + "label": "cl1", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1, "T4": 1}, + }, "cl1_3_4": { "label": "cl1", "dims": {"D1": 3, "D2": 4}, @@ -465,6 +513,202 @@ def testClusterOrder(self): ) processed.add(cluster.name) + def testAddClusterDependenciesSink(self): + name = "AddClusterDependenciesSink" + config = BpsConfig( + { + "templateDataId": "{D1}_{D2}_{D3}_{D4}", + "cluster": { + "cl1": { + "pipetasks": "T1, T2, T3", + "dimensions": "D1, D2", + "find_dependency_method": "sink", + }, + }, + } + ) + answer = { + "name": name, + "nodes": { + "cl1_1_2": { + "label": "cl1", + "dims": {"D1": 1, "D2": 2}, + "counts": {"T1": 1, "T2": 1, "T3": 1}, + }, + "cl1_1_4": { + "label": "cl1", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1}, + }, + "cl1_3_4": { + "label": "cl1", + "dims": {"D1": 3, "D2": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1}, + }, + "NODENAME_T4_1_2_": {"label": "T4", "dims": {"D1": 1, "D2": 2}, "counts": {"T4": 1}}, + "NODENAME_T4_1_4_": {"label": "T4", "dims": {"D1": 1, "D2": 4}, "counts": {"T4": 1}}, + "NODENAME_T4_3_4_": {"label": "T4", "dims": {"D1": 3, "D2": 4}, "counts": {"T4": 1}}, + }, + "edges": [], + } + cqg = dimension_clustering(config, self.qgraph, name) + check_cqg(cqg, answer) + + def testAddClusterDependenciesSource(self): + name = "AddClusterDependenciesSource" + config = BpsConfig( + { + "templateDataId": "{D1}_{D2}_{D3}_{D4}", + "cluster": { + "cl1": { + "pipetasks": "T1, T2, T3", + "dimensions": "D1, D2", + "find_dependency_method": "source", + }, + }, + } + ) + answer = { + "name": name, + "nodes": { + "cl1_1_2": { + "label": "cl1", + "dims": {"D1": 1, "D2": 2}, + "counts": {"T1": 1, "T2": 1, "T3": 1}, + }, + "cl1_1_4": { + "label": "cl1", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1}, + }, + "cl1_3_4": { + "label": "cl1", + "dims": {"D1": 3, "D2": 4}, + "counts": {"T1": 1, "T2": 1, "T3": 1}, + }, + "NODENAME_T4_1_2_": {"label": "T4", "dims": {"D1": 1, "D2": 2}, "counts": {"T4": 1}}, + "NODENAME_T4_1_4_": {"label": "T4", "dims": {"D1": 1, "D2": 4}, "counts": {"T4": 1}}, + "NODENAME_T4_3_4_": {"label": "T4", "dims": {"D1": 3, "D2": 4}, "counts": {"T4": 1}}, + }, + "edges": [], + } + cqg = dimension_clustering(config, self.qgraph, name) + check_cqg(cqg, answer) + + def testAddClusterDependenciesExtra(self): + # Test if dependencies return more than in cluster + name = "AddClusterDependenciesExtra" + config = BpsConfig( + { + "templateDataId": "{D1}_{D2}_{D3}_{D4}", + "cluster": { + "cl1": { + "pipetasks": "T1, T2", + "dimensions": "D1, D2", + "find_dependency_method": "source", + }, + }, + } + ) + answer = { + "name": name, + "nodes": { + "cl1_1_2": { + "label": "cl1", + "dims": {"D1": 1, "D2": 2}, + "counts": {"T1": 1, "T2": 1}, + }, + "cl1_1_4": { + "label": "cl1", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T1": 1, "T2": 1}, + }, + "cl1_3_4": { + "label": "cl1", + "dims": {"D1": 3, "D2": 4}, + "counts": {"T1": 1, "T2": 1}, + }, + "NODENAME_T3_1_2_": {"label": "T3", "dims": {"D1": 1, "D2": 2}, "counts": {"T3": 1}}, + "NODENAME_T3_1_4_": {"label": "T3", "dims": {"D1": 1, "D2": 4}, "counts": {"T3": 1}}, + "NODENAME_T3_3_4_": {"label": "T3", "dims": {"D1": 3, "D2": 4}, "counts": {"T3": 1}}, + "NODENAME_T4_1_2_": {"label": "T4", "dims": {"D1": 1, "D2": 2}, "counts": {"T4": 1}}, + "NODENAME_T4_1_4_": {"label": "T4", "dims": {"D1": 1, "D2": 4}, "counts": {"T4": 1}}, + "NODENAME_T4_3_4_": {"label": "T4", "dims": {"D1": 3, "D2": 4}, "counts": {"T4": 1}}, + }, + "edges": [ + ("cl1_1_2", "NODENAME_T3_1_2_"), + ("cl1_1_4", "NODENAME_T3_1_4_"), + ("cl1_3_4", "NODENAME_T3_3_4_"), + ], + } + cqg = dimension_clustering(config, self.qgraph, name) + check_cqg(cqg, answer) + + def testAddClusterDependenciesSameCluster(self): + name = "AddClusterDependenciesSameCluster" + config = BpsConfig( + { + "cluster": { + "cl1": { + "pipetasks": "T1, T2, T3", + "dimensions": "D1", + "find_dependency_method": "source", + "clusterTemplate": "ct_{D1}_{D2}_{D3}_{D4}", + }, + }, + } + ) + answer = { + "name": name, + "nodes": { + "ct_1_": { + "label": "cl1", + "dims": {"D1": 1}, + "counts": {"T1": 2, "T2": 2, "T3": 2}, + }, + "ct_3_": { + "label": "cl1", + "dims": {"D1": 3}, + "counts": {"T1": 1, "T2": 1, "T3": 1}, + }, + "NODEONLY_T4_{'D1': 1, 'D2': 2}": { + "label": "T4", + "dims": {"D1": 1, "D2": 2}, + "counts": {"T4": 1}, + }, + "NODEONLY_T4_{'D1': 1, 'D2': 4}": { + "label": "T4", + "dims": {"D1": 1, "D2": 4}, + "counts": {"T4": 1}, + }, + "NODEONLY_T4_{'D1': 3, 'D2': 4}": { + "label": "T4", + "dims": {"D1": 3, "D2": 4}, + "counts": {"T4": 1}, + }, + }, + "edges": [], + } + cqg = dimension_clustering(config, self.qgraph, name) + check_cqg(cqg, answer) + + def testAddClusterDependenciesBadMethod(self): + name = "AddClusterDependenciesBadMethod" + config = BpsConfig( + { + "templateDataId": "{D1}_{D2}_{D3}_{D4}", + "cluster": { + "cl1": { + "pipetasks": "T1, T2, T3", + "dimensions": "D1, D2", + "find_dependency_method": "bad", + }, + }, + } + ) + with self.assertRaises(RuntimeError): + _ = dimension_clustering(config, self.qgraph, name) + if __name__ == "__main__": unittest.main()