diff --git a/benchmarks/benchmarks/bench_cluster.py b/benchmarks/benchmarks/bench_cluster.py new file mode 100644 index 0000000..c4e9249 --- /dev/null +++ b/benchmarks/benchmarks/bench_cluster.py @@ -0,0 +1,17 @@ +from .common import ( + backends, + num_nodes, + edge_prob, + get_cached_gnp_random_graph, + Benchmark, +) +import networkx as nx + + +class Cluster(Benchmark): + params = [(backends), (num_nodes), (edge_prob)] + param_names = ["backend", "num_nodes", "edge_prob"] + + def time_square_clustering(self, backend, num_nodes, edge_prob): + G = get_cached_gnp_random_graph(num_nodes, edge_prob) + _ = nx.square_clustering(G, backend=backend) diff --git a/nx_parallel/algorithms/__init__.py b/nx_parallel/algorithms/__init__.py index ba36483..58410de 100644 --- a/nx_parallel/algorithms/__init__.py +++ b/nx_parallel/algorithms/__init__.py @@ -7,3 +7,4 @@ from .isolate import * from .tournament import * from .vitality import * +from .cluster import * diff --git a/nx_parallel/algorithms/cluster.py b/nx_parallel/algorithms/cluster.py new file mode 100644 index 0000000..8218d12 --- /dev/null +++ b/nx_parallel/algorithms/cluster.py @@ -0,0 +1,66 @@ +from itertools import combinations, chain +from joblib import Parallel, delayed +import nx_parallel as nxp + +__all__ = [ + "square_clustering", +] + + +def square_clustering(G, nodes=None, get_chunks="chunks"): + """The nodes are chunked into `node_chunks` and then the square clustering + coefficient for all `node_chunks` are computed in parallel over all available + CPU cores. + + Parameters + ------------ + get_chunks : str, function (default = "chunks") + A function that takes in a list of all the nodes (or nbunch) as input and + returns an iterable `node_chunks`. The default chunking is done by slicing the + `nodes` into `n` chunks, where `n` is the number of CPU cores. + + networkx.square_clustering: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.cluster.square_clustering.html + """ + + def _compute_clustering_chunk(node_iter_chunk): + result_chunk = [] + for v in node_iter_chunk: + clustering = 0 + potential = 0 + for u, w in combinations(G[v], 2): + squares = len((set(G[u]) & set(G[w])) - {v}) + clustering += squares + degm = squares + 1 + if w in G[u]: + degm += 1 + potential += (len(G[u]) - degm) + (len(G[w]) - degm) + squares + if potential > 0: + clustering /= potential + result_chunk += [(v, clustering)] + return result_chunk + + if hasattr(G, "graph_object"): + G = G.graph_object + + if nodes is None: + node_iter = list(G) + else: + node_iter = list(G.nbunch_iter(nodes)) + + total_cores = nxp.cpu_count() + + if get_chunks == "chunks": + num_in_chunk = max(len(node_iter) // total_cores, 1) + node_iter_chunks = nxp.chunks(node_iter, num_in_chunk) + else: + node_iter_chunks = get_chunks(node_iter) + + result = Parallel(n_jobs=total_cores)( + delayed(_compute_clustering_chunk)(node_iter_chunk) + for node_iter_chunk in node_iter_chunks + ) + clustering = dict(chain.from_iterable(result)) + + if nodes in G: + return clustering[nodes] + return clustering diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index d492029..39069ff 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -7,6 +7,7 @@ tournament_is_strongly_connected, ) from nx_parallel.algorithms.vitality import closeness_vitality +from nx_parallel.algorithms.cluster import square_clustering __all__ = ["Dispatcher", "ParallelGraph"] @@ -49,6 +50,9 @@ class Dispatcher: # Shortest Paths : all pairs shortest paths(bellman_ford) all_pairs_bellman_ford_path = all_pairs_bellman_ford_path + # Clustering + square_clustering = square_clustering + # ============================= @staticmethod diff --git a/timing/heatmap_square_clustering_timing.png b/timing/heatmap_square_clustering_timing.png new file mode 100644 index 0000000..d76a452 Binary files /dev/null and b/timing/heatmap_square_clustering_timing.png differ diff --git a/timing/timing_individual_function.py b/timing/timing_individual_function.py index ecff35c..6959344 100644 --- a/timing/timing_individual_function.py +++ b/timing/timing_individual_function.py @@ -7,34 +7,36 @@ import seaborn as sns from matplotlib import pyplot as plt -import nx_parallel +import nx_parallel as nxp # Code to create README heatmaps for individual function currFun heatmapDF = pd.DataFrame() number_of_nodes_list = [10, 50, 100, 300, 500] pList = [1, 0.8, 0.6, 0.4, 0.2] -currFun = nx.all_pairs_bellman_ford_path +weighted = False +currFun = nx.square_clustering for p in pList: for num in number_of_nodes_list: # create original and parallel graphs G = nx.fast_gnp_random_graph(num, p, seed=42, directed=False) # for weighted graphs - random.seed(42) - for u, v in G.edges(): - G[u][v]["weight"] = random.random() + if weighted: + random.seed(42) + for u, v in G.edges(): + G[u][v]["weight"] = random.random() - H = nx_parallel.ParallelGraph(G) + H = nxp.ParallelGraph(G) # time both versions and update heatmapDF t1 = time.time() - c = currFun(H) + c = nx.square_clustering(H) if isinstance(c, types.GeneratorType): d = dict(c) t2 = time.time() parallelTime = t2 - t1 t1 = time.time() - c = currFun(G) + c = nx.square_clustering(G) if isinstance(c, types.GeneratorType): d = dict(c) t2 = time.time() @@ -73,7 +75,7 @@ plt.xticks(rotation=45) plt.yticks(rotation=20) plt.title( - "Small Scale Demo: Times Speedups of " + currFun.__name__ + " compared to networkx" + "Small Scale Demo: Times Speedups of " + currFun.__name__ + " compared to NetworkX" ) plt.xlabel("Number of Vertices") plt.ylabel("Edge Probability")