Skip to content

Commit

Permalink
Merge branch 'main' into shortest_paths
Browse files Browse the repository at this point in the history
  • Loading branch information
dschult committed Mar 10, 2024
2 parents a96dbff + 882646b commit 34ef348
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 13 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ jobs:
echo "Done with installing"
- name: PyTest
run: |
NETWORKX_GRAPH_CONVERT=parallel \
NETWORKX_TEST_BACKEND=parallel \
NETWORKX_FALLBACK_TO_NX=True \
python -m pytest --pyargs networkx
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ To run tests for the project, use the following command:

```
PYTHONPATH=. \
NETWORKX_GRAPH_CONVERT=parallel \
NETWORKX_TEST_BACKEND=parallel \
NETWORKX_FALLBACK_TO_NX=True \
pytest --pyargs networkx "$@"
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
"env_dir": "env",
"results_dir": "results",
"html_dir": "html",
"build_cache_size": 8
"build_cache_size": 8,
"default_benchmark_timeout": 1200,
}
17 changes: 17 additions & 0 deletions benchmarks/benchmarks/bench_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .common import (
backends,
num_nodes,
edge_prob,
get_cached_gnp_random_graph,
Benchmark,
)
import networkx as nx


class Cluster(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_square_clustering(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
_ = nx.square_clustering(G, backend=backend)
4 changes: 4 additions & 0 deletions benchmarks/benchmarks/bench_shortest_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ def time_all_pairs_bellman_ford_path_length(self, backend, num_nodes, edge_prob)
def time_all_pairs_bellman_ford_path(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_bellman_ford_path(G, backend=backend))

def time_johnson(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = nx.johnson(G, backend=backend)
1 change: 1 addition & 0 deletions nx_parallel/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
from .isolate import *
from .tournament import *
from .vitality import *
from .cluster import *
66 changes: 66 additions & 0 deletions nx_parallel/algorithms/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from itertools import combinations, chain
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = [
"square_clustering",
]


def square_clustering(G, nodes=None, get_chunks="chunks"):
"""The nodes are chunked into `node_chunks` and then the square clustering
coefficient for all `node_chunks` are computed in parallel over all available
CPU cores.
Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes (or nbunch) as input and
returns an iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n` chunks, where `n` is the number of CPU cores.
networkx.square_clustering: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.cluster.square_clustering.html
"""

def _compute_clustering_chunk(node_iter_chunk):
result_chunk = []
for v in node_iter_chunk:
clustering = 0
potential = 0
for u, w in combinations(G[v], 2):
squares = len((set(G[u]) & set(G[w])) - {v})
clustering += squares
degm = squares + 1
if w in G[u]:
degm += 1
potential += (len(G[u]) - degm) + (len(G[w]) - degm) + squares
if potential > 0:
clustering /= potential
result_chunk += [(v, clustering)]
return result_chunk

if hasattr(G, "graph_object"):
G = G.graph_object

if nodes is None:
node_iter = list(G)
else:
node_iter = list(G.nbunch_iter(nodes))

total_cores = nxp.cpu_count()

if get_chunks == "chunks":
num_in_chunk = max(len(node_iter) // total_cores, 1)
node_iter_chunks = nxp.chunks(node_iter, num_in_chunk)
else:
node_iter_chunks = get_chunks(node_iter)

result = Parallel(n_jobs=total_cores)(
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
clustering = dict(chain.from_iterable(result))

if nodes in G:
return clustering[nodes]
return clustering
54 changes: 54 additions & 0 deletions nx_parallel/algorithms/shortest_paths/weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
single_source_dijkstra_path,
single_source_bellman_ford_path,
single_source_bellman_ford_path_length,
_weight_function,
_dijkstra,
_bellman_ford,
)

__all__ = [
Expand All @@ -18,6 +21,7 @@
"all_pairs_dijkstra_path",
"all_pairs_bellman_ford_path_length",
"all_pairs_bellman_ford_path",
"johnson",
]


Expand Down Expand Up @@ -223,3 +227,53 @@ def _calculate_shortest_paths_subset(source):
delayed(_calculate_shortest_paths_subset)(source) for source in nodes
)
return paths


def johnson(G, weight="weight", get_chunks="chunks"):
"""The parallel computation is implemented by dividing the
nodes into chunks and computing the shortest paths using Johnson's Algorithm
for each chunk in parallel.
Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in an iterable of all the nodes as input and returns
an iterable `node_chunks`. The default chunking is done by slicing the
`G.nodes` into `n` chunks, where `n` is the number of CPU cores.
networkx.johnson : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.johnson.html#johnson
"""
if hasattr(G, "graph_object"):
G = G.graph_object

dist = {v: 0 for v in G}
pred = {v: [] for v in G}
weight = _weight_function(G, weight)

# Calculate distance of shortest paths
dist_bellman = _bellman_ford(G, list(G), weight, pred=pred, dist=dist)

# Update the weight function to take into account the Bellman--Ford
# relaxation distances.
def new_weight(u, v, d):
return weight(u, v, d) + dist_bellman[u] - dist_bellman[v]

def dist_path(v):
paths = {v: [v]}
_dijkstra(G, v, new_weight, paths=paths)
return paths

def _johnson_subset(chunk):
return {node: dist_path(node) for node in chunk}

total_cores = nxp.cpu_count()
if get_chunks == "chunks":
num_in_chunk = max(len(G.nodes) // total_cores, 1)
node_chunks = nxp.chunks(G.nodes, num_in_chunk)
else:
node_chunks = get_chunks(G.nodes)

results = Parallel(n_jobs=total_cores)(
delayed(_johnson_subset)(chunk) for chunk in node_chunks
)
return {v: d_path for result_chunk in results for v, d_path in result_chunk.items()}
6 changes: 6 additions & 0 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
all_pairs_dijkstra_path,
all_pairs_bellman_ford_path_length,
all_pairs_bellman_ford_path,
johnson,
)
from nx_parallel.algorithms.shortest_paths.unweighted import (
all_pairs_shortest_path,
Expand All @@ -22,6 +23,7 @@
all_pairs_node_connectivity,
)
from nx_parallel.algorithms.connectivity import connectivity
from nx_parallel.algorithms.cluster import square_clustering

__all__ = ["Dispatcher", "ParallelGraph"]

Expand Down Expand Up @@ -70,6 +72,10 @@ class Dispatcher:
all_pairs_dijkstra_path = all_pairs_dijkstra_path
all_pairs_bellman_ford_path_length = all_pairs_bellman_ford_path_length
all_pairs_bellman_ford_path = all_pairs_bellman_ford_path
johnson = johnson

# Clustering
square_clustering = square_clustering

# Shortest Paths : unweighted graphs
all_pairs_shortest_path = all_pairs_shortest_path
Expand Down
14 changes: 12 additions & 2 deletions nx_parallel/utils/backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import nx_parallel.algorithms as algorithms
import inspect


__all__ = ["get_info"]
Expand All @@ -20,13 +21,21 @@ def get_funcs_info():
if callable(getattr(file_module, name, None))
]
for function in functions:
try:
func_line = inspect.getsourcelines(
getattr(file_module, function)
)[1]
except Exception as e:
print(e)
func_line = None
try:
# Extracting docstring
docstring = getattr(file_module, function).__doc__

try:
# Extracting Parallel Computation description
# Assuming that the first para in docstring is the function's PC desc
# "par" is short for "parallel"
par_docs_ = docstring.split("\n\n")[0]
par_docs_ = par_docs_.split("\n")
par_docs_ = [
Expand Down Expand Up @@ -55,8 +64,9 @@ def get_funcs_info():
par_docs = None

funcs[function] = {
"extra_docstring": par_docs,
"extra_parameters": par_params,
"url": f"https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/{file_name}.py#{func_line}",
"additional_docs": par_docs,
"additional_parameters": par_params,
}
return funcs

Expand Down
Binary file added timing/heatmap_johnson_timing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added timing/heatmap_square_clustering_timing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 8 additions & 8 deletions timing/timing_individual_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,36 @@
import seaborn as sns
from matplotlib import pyplot as plt

import nx_parallel
import nx_parallel as nxp

# Code to create README heatmaps for individual function currFun
heatmapDF = pd.DataFrame()
number_of_nodes_list = [10, 50, 100, 300, 500]
pList = [1, 0.8, 0.6, 0.4, 0.2]
weight = False
currFun = nx.all_pairs_node_connectivity
weighted = False
currFun = nx.square_clustering
for p in pList:
for num in number_of_nodes_list:
# create original and parallel graphs
G = nx.fast_gnp_random_graph(num, p, seed=42, directed=False)

# for weighted graphs
if weight:
if weighted:
random.seed(42)
for u, v in G.edges():
G[u][v]["weight"] = random.random()

H = nx_parallel.ParallelGraph(G)
H = nxp.ParallelGraph(G)

# time both versions and update heatmapDF
t1 = time.time()
c = currFun(H)
c = nx.square_clustering(H)
if isinstance(c, types.GeneratorType):
d = dict(c)
t2 = time.time()
parallelTime = t2 - t1
t1 = time.time()
c = currFun(G)
c = nx.square_clustering(G)
if isinstance(c, types.GeneratorType):
d = dict(c)
t2 = time.time()
Expand Down Expand Up @@ -76,7 +76,7 @@
plt.xticks(rotation=45)
plt.yticks(rotation=20)
plt.title(
"Small Scale Demo: Times Speedups of " + currFun.__name__ + " compared to networkx"
"Small Scale Demo: Times Speedups of " + currFun.__name__ + " compared to NetworkX"
)
plt.xlabel("Number of Vertices")
plt.ylabel("Edge Probability")
Expand Down

0 comments on commit 34ef348

Please sign in to comment.