diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3160eae..eb64b73 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,13 +2,6 @@ # pre-commit install repos: - - repo: local - hooks: - - id: update-get_info - name: Update function info - entry: sh _nx_parallel/script.sh - language: system - pass_filenames: false - repo: https://github.com/adamchainz/blacken-docs rev: 1.16.0 hooks: @@ -26,3 +19,10 @@ repos: args: - --fix - id: ruff-format + - repo: local + hooks: + - id: update-get_info + name: Update function info + entry: sh _nx_parallel/script.sh + language: system + pass_filenames: false diff --git a/_nx_parallel/update_get_info.py b/_nx_parallel/update_get_info.py index 6611dd4..84b3996 100644 --- a/_nx_parallel/update_get_info.py +++ b/_nx_parallel/update_get_info.py @@ -20,7 +20,7 @@ def get_funcs_info(): nx_parallel_dir = os.path.join(os.getcwd(), "nx_parallel") for root, dirs, files in os.walk(nx_parallel_dir): - if root[-29:] == "nx-parallel/nx_parallel/utils": + if "nx_parallel/utils" in root: continue for file in files: if ( diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 0c40973..38af8c7 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -1,50 +1,57 @@ -from nx_parallel.algorithms.bipartite.redundancy import node_redundancy -from nx_parallel.algorithms.centrality.betweenness import ( - betweenness_centrality, - edge_betweenness_centrality, -) -from nx_parallel.algorithms.shortest_paths.generic import all_pairs_all_shortest_paths -from nx_parallel.algorithms.shortest_paths.weighted import ( - all_pairs_dijkstra, - all_pairs_dijkstra_path_length, - all_pairs_dijkstra_path, - all_pairs_bellman_ford_path_length, - all_pairs_bellman_ford_path, - johnson, -) -from nx_parallel.algorithms.shortest_paths.unweighted import ( - all_pairs_shortest_path, - all_pairs_shortest_path_length, -) -from nx_parallel.algorithms.efficiency_measures import local_efficiency -from nx_parallel.algorithms.isolate import number_of_isolates -from nx_parallel.algorithms.tournament import ( - is_reachable, - tournament_is_strongly_connected, -) -from nx_parallel.algorithms.vitality import closeness_vitality -from nx_parallel.algorithms.approximation.connectivity import ( - approximate_all_pairs_node_connectivity, -) -from nx_parallel.algorithms.connectivity import connectivity -from nx_parallel.algorithms.cluster import square_clustering +from operator import attrgetter import networkx as nx +from nx_parallel import algorithms __all__ = ["BackendInterface", "ParallelGraph"] +ALGORITHMS = [ + # Bipartite + "node_redundancy", + # Isolates + "number_of_isolates", + # Vitality + "closeness_vitality", + # Tournament + "is_reachable", + "tournament_is_strongly_connected", + # Centrality + "betweenness_centrality", + "edge_betweenness_centrality", + # Efficiency + "local_efficiency", + # Shortest Paths : generic + "all_pairs_all_shortest_paths", + # Shortest Paths : weighted graphs + "all_pairs_dijkstra", + "all_pairs_dijkstra_path_length", + "all_pairs_dijkstra_path", + "all_pairs_bellman_ford_path_length", + "all_pairs_bellman_ford_path", + "johnson", + # Clustering + "square_clustering", + # Shortest Paths : unweighted graphs + "all_pairs_shortest_path", + "all_pairs_shortest_path_length", + # Approximation + "approximate_all_pairs_node_connectivity", + # Connectivity + "connectivity.all_pairs_node_connectivity", +] + + class ParallelGraph: """A wrapper class for networkx.Graph, networkx.DiGraph, networkx.MultiGraph, - and networkx.MultiDiGraph.""" + and networkx.MultiDiGraph. + """ __networkx_backend__ = "parallel" def __init__(self, graph_object=None): if graph_object is None: self.graph_object = nx.Graph() - elif isinstance( - graph_object, (nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph) - ): + elif isinstance(graph_object, nx.Graph): self.graph_object = graph_object else: self.graph_object = nx.Graph(graph_object) @@ -56,62 +63,27 @@ def is_directed(self): return self.graph_object.is_directed() def __str__(self): - return "Parallel" + str(self.graph_object) - - -class BackendInterface: - """BackendInterface class for parallel algorithms.""" + return f"Parallel{self.graph_object}" - # Bipartite - node_redundancy = node_redundancy - # Isolates - number_of_isolates = number_of_isolates +def assign_algorithms(cls): + """Class decorator to assign algorithms to the class attributes.""" + for attr in ALGORITHMS: + # get the function name by parsing the module hierarchy + func_name = attr.rsplit(".", 1)[-1] + setattr(cls, func_name, attrgetter(attr)(algorithms)) + return cls - # Vitality - closeness_vitality = closeness_vitality - # Tournament - is_reachable = is_reachable - tournament_is_strongly_connected = tournament_is_strongly_connected - - # Centrality - betweenness_centrality = betweenness_centrality - edge_betweenness_centrality = edge_betweenness_centrality - - # Efficiency - local_efficiency = local_efficiency - - # Shortest Paths : generic - all_pairs_all_shortest_paths = all_pairs_all_shortest_paths - - # Shortest Paths : weighted graphs - all_pairs_dijkstra = all_pairs_dijkstra - all_pairs_dijkstra_path_length = all_pairs_dijkstra_path_length - all_pairs_dijkstra_path = all_pairs_dijkstra_path - all_pairs_bellman_ford_path_length = all_pairs_bellman_ford_path_length - all_pairs_bellman_ford_path = all_pairs_bellman_ford_path - johnson = johnson - - # Clustering - square_clustering = square_clustering - - # Shortest Paths : unweighted graphs - all_pairs_shortest_path = all_pairs_shortest_path - all_pairs_shortest_path_length = all_pairs_shortest_path_length - - # Approximation - approximate_all_pairs_node_connectivity = approximate_all_pairs_node_connectivity - - # Connectivity - all_pairs_node_connectivity = connectivity.all_pairs_node_connectivity - - # ============================= +@assign_algorithms +class BackendInterface: + """BackendInterface class for parallel algorithms.""" @staticmethod def convert_from_nx(graph, *args, **kwargs): """Convert a networkx.Graph, networkx.DiGraph, networkx.MultiGraph, - or networkx.MultiDiGraph to a ParallelGraph.""" + or networkx.MultiDiGraph to a ParallelGraph. + """ if isinstance(graph, ParallelGraph): return graph return ParallelGraph(graph) @@ -119,7 +91,8 @@ def convert_from_nx(graph, *args, **kwargs): @staticmethod def convert_to_nx(result, *, name=None): """Convert a ParallelGraph to a networkx.Graph, networkx.DiGraph, - networkx.MultiGraph, or networkx.MultiDiGraph.""" + networkx.MultiGraph, or networkx.MultiDiGraph. + """ if isinstance(result, ParallelGraph): return result.graph_object return result diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index fbf6f3e..c8e63f9 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -2,11 +2,12 @@ import inspect import importlib -import networkx as nx -import nx_parallel as nxp import random import types import math +import networkx as nx + +import nx_parallel as nxp def get_all_functions(package_name="nx_parallel"): diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index 3ac1117..5a9052d 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -2,65 +2,94 @@ import os import networkx as nx + __all__ = ["chunks", "get_n_jobs", "create_iterables"] -def chunks(iterable, n): - """Divides an iterable into chunks of size n""" +def chunks(iterable, n_chunks): + """Yield exactly `n_chunks` chunks from `iterable`, balancing the chunk sizes.""" + iterable = list(iterable) + k, m = divmod(len(iterable), n_chunks) it = iter(iterable) - while True: - x = tuple(itertools.islice(it, n)) - if not x: - return - yield x + for _ in range(n_chunks): + chunk_size = k + (1 if m > 0 else 0) + m -= 1 + yield tuple(itertools.islice(it, chunk_size)) def get_n_jobs(n_jobs=None): - """Returns the positive value of `n_jobs`.""" + """Get the positive value of `n_jobs` + + Returns the positive value of `n_jobs` by either extracting it from the + active configuration system or modifying the passed-in value, similar to + joblib's behavior. + + - If running under pytest, it returns 2 jobs. + - If the `active` configuration in NetworkX's config is `True`, `n_jobs` + is extracted from the NetworkX config. + - Otherwise, `n_jobs` is obtained from joblib's active backend. + - `ValueError` is raised if `n_jobs` is 0. + """ if "PYTEST_CURRENT_TEST" in os.environ: return 2 - else: + + if n_jobs is None: if nx.config.backends.parallel.active: n_jobs = nx.config.backends.parallel.n_jobs else: from joblib.parallel import get_active_backend n_jobs = get_active_backend()[1] - n_cpus = os.cpu_count() - if n_jobs is None: - return 1 - if n_jobs < 0: - return n_cpus + n_jobs + 1 - if n_jobs == 0: - raise ValueError("n_jobs == 0 in Parallel has no meaning") - return int(n_jobs) + + if n_jobs is None: + return 1 + if n_jobs < 0: + return os.cpu_count() + n_jobs + 1 + + if n_jobs == 0: + raise ValueError("n_jobs == 0 in Parallel has no meaning") + + return int(n_jobs) def create_iterables(G, iterator, n_cores, list_of_iterator=None): - """Creates an iterable of function inputs for parallel computation + """Create an iterable of function inputs for parallel computation based on the provided iterator type. Parameters - ----------- + ---------- G : NetworkX graph + The NetworkX graph. iterator : str Type of iterator. Valid values are 'node', 'edge', 'isolate' + n_cores : int + The number of cores to use. + list_of_iterator : list, optional + A precomputed list of items to iterate over. If None, it will + be generated based on the iterator type. - Returns: - -------- + Returns + ------- iterable : Iterable An iterable of function inputs. + + Raises + ------ + ValueError + If the iterator type is not one of "node", "edge" or "isolate". """ - if iterator in ["node", "edge", "isolate"]: - if list_of_iterator is None: - if iterator == "node": - list_of_iterator = list(G.nodes) - elif iterator == "edge": - list_of_iterator = list(G.edges) - elif iterator == "isolate": - list_of_iterator = list(nx.isolates(G)) - num_in_chunk = max(len(list_of_iterator) // n_cores, 1) - return chunks(list_of_iterator, num_in_chunk) - else: - raise ValueError("Invalid iterator type.") + if not list_of_iterator: + if iterator == "node": + list_of_iterator = list(G.nodes) + elif iterator == "edge": + list_of_iterator = list(G.edges) + elif iterator == "isolate": + list_of_iterator = list(nx.isolates(G)) + else: + raise ValueError(f"Invalid iterator type: {iterator}") + + if not list_of_iterator: + return iter([]) + + return chunks(list_of_iterator, n_cores) diff --git a/nx_parallel/utils/decorators.py b/nx_parallel/utils/decorators.py index c5bb7f8..f861c84 100644 --- a/nx_parallel/utils/decorators.py +++ b/nx_parallel/utils/decorators.py @@ -1,17 +1,17 @@ +import os from dataclasses import asdict -from joblib import parallel_config -import networkx as nx from functools import wraps -import os +import networkx as nx +from joblib import parallel_config + -__all__ = [ - "_configure_if_nx_active", -] +__all__ = ["_configure_if_nx_active"] def _configure_if_nx_active(): """Decorator to set the configuration for the parallel computation - of the nx-parallel algorithms.""" + of the nx-parallel algorithms. + """ def decorator(func): @wraps(func) @@ -20,16 +20,14 @@ def wrapper(*args, **kwargs): nx.config.backends.parallel.active or "PYTEST_CURRENT_TEST" in os.environ ): - # to activate nx config system in nx_parallel use: + # Activate nx config system in nx_parallel with: # `nx.config.backends.parallel.active = True` config_dict = asdict(nx.config.backends.parallel) - del config_dict["active"] - config_dict.update(config_dict["backend_params"]) - del config_dict["backend_params"] + config_dict.update(config_dict.pop("backend_params")) + config_dict.pop("active", None) with parallel_config(**config_dict): return func(*args, **kwargs) - else: - return func(*args, **kwargs) + return func(*args, **kwargs) return wrapper diff --git a/nx_parallel/utils/tests/__init__.py b/nx_parallel/utils/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nx_parallel/utils/tests/test_chunk.py b/nx_parallel/utils/tests/test_chunk.py new file mode 100644 index 0000000..45ef9ae --- /dev/null +++ b/nx_parallel/utils/tests/test_chunk.py @@ -0,0 +1,61 @@ +import os +import pytest +import networkx as nx +import nx_parallel as nxp + + +def test_get_n_jobs(): + """Test for various scenarios in `get_n_jobs`.""" + # Test with no n_jobs (default) + with pytest.MonkeyPatch().context() as mp: + mp.delitem(os.environ, "PYTEST_CURRENT_TEST", raising=False) + assert nxp.get_n_jobs() == 1 + + # Test with n_jobs set to positive value + assert nxp.get_n_jobs(4) == 4 + + # Test with n_jobs set to negative value + assert nxp.get_n_jobs(-1) == os.cpu_count() + nx.config.backends.parallel.active = False + from joblib import parallel_config + + parallel_config(n_jobs=3) + assert nxp.get_n_jobs() == 3 + nx.config.backends.parallel.active = True + nx.config.backends.parallel.n_jobs = 5 + assert nxp.get_n_jobs() == 5 + # Test with n_jobs = 0 to raise a ValueError + try: + nxp.get_n_jobs(0) + except ValueError as e: + assert str(e) == "n_jobs == 0 in Parallel has no meaning" + + +def test_chunks(): + """Test `chunks` for various input scenarios.""" + data = list(range(10)) + + # Test chunking with exactly 2 larger chunks (balanced) + chunks_list = list(nxp.chunks(data, 2)) + assert chunks_list == [(0, 1, 2, 3, 4), (5, 6, 7, 8, 9)] + + # Test chunking into 5 smaller chunks + chunks_list = list(nxp.chunks(data, 5)) + assert chunks_list == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)] + + +def test_create_iterables(): + """Test `create_iterables` for different iterator types.""" + G = nx.fast_gnp_random_graph(50, 0.6, seed=42) + + # Test node iterator + iterable = nxp.create_iterables(G, "node", 4) + assert len(list(iterable)) == 4 + + # Test edge iterator + iterable = nxp.create_iterables(G, "edge", 4) + assert len(list(iterable)) == 4 + + # Test isolate iterator (G has no isolates, so this should be empty) + iterable = nxp.create_iterables(G, "isolate", 4) + assert len(list(iterable)) == 0