Skip to content

ENH : adding parallel implementations of all_pairs_ algos #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f9f1ff4
initial commit
Schefflera-Arboricola Jan 17, 2024
d7fcf95
rm extra docs
Schefflera-Arboricola Jan 19, 2024
fcf6612
added unweighted.py rm johnson and all non all_pairs_ algos from weig…
Schefflera-Arboricola Jan 23, 2024
1f68d7d
added all_pairs_node_connectivity
Schefflera-Arboricola Jan 23, 2024
94de91f
Update weighted.py
Schefflera-Arboricola Jan 24, 2024
485270d
modifying G
Schefflera-Arboricola Jan 25, 2024
a6a8f0a
fixed all_pairs_node_connectivity
Schefflera-Arboricola Jan 26, 2024
0cba23e
un-updated ParallelGraph class
Schefflera-Arboricola Jan 30, 2024
e9c6c06
style fixes
Schefflera-Arboricola Feb 1, 2024
5bc797e
changed all_pairs def
Schefflera-Arboricola Feb 7, 2024
414f2c8
adding directed to _calculate_all_pairs_node_connectivity_subset
Schefflera-Arboricola Feb 7, 2024
4f8a751
updated docs of all funcs
Schefflera-Arboricola Feb 8, 2024
00b310b
style fix
Schefflera-Arboricola Feb 8, 2024
20ec806
added benchmarks
Schefflera-Arboricola Feb 8, 2024
01e42ac
style fix
Schefflera-Arboricola Feb 8, 2024
bbbc5f2
added 6 heatmaps(no speedups in 3)
Schefflera-Arboricola Feb 18, 2024
cbda3e8
used loky backend in approximation.connectivity.all_pairs_node_connec…
Schefflera-Arboricola Feb 24, 2024
c7a341b
added get_chunks to shortest paths algos
Schefflera-Arboricola Feb 26, 2024
95b9217
added chunking in all_pairs_node_connectivity, added benchmarks, fixe…
Schefflera-Arboricola Feb 26, 2024
a96dbff
updated docstrings of all 9 funcs
Schefflera-Arboricola Feb 26, 2024
34ef348
Merge branch 'main' into shortest_paths
dschult Mar 10, 2024
07db6dc
Merge branch 'main' into shortest_paths
dschult Mar 11, 2024
5c28ad1
typo fix
Schefflera-Arboricola Mar 11, 2024
e0c000b
Merge branch 'main' into shortest_paths
Schefflera-Arboricola Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions benchmarks/benchmarks/bench_approximation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from .common import (
backends,
num_nodes,
edge_prob,
get_cached_gnp_random_graph,
Benchmark,
)
import networkx as nx
import nx_parallel as nxp


class Connectivity(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_approximate_all_pairs_node_connectivity(
self, backend, num_nodes, edge_prob
):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
if backend == "parallel":
G = nxp.ParallelGraph(G)
_ = nx.algorithms.approximation.connectivity.all_pairs_node_connectivity(G)
20 changes: 20 additions & 0 deletions benchmarks/benchmarks/bench_connectivity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from .common import (
backends,
num_nodes,
edge_prob,
get_cached_gnp_random_graph,
Benchmark,
)
import networkx as nx
import nx_parallel as nxp


class Connectivity(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_node_connectivity(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
if backend == "parallel":
G = nxp.ParallelGraph(G)
_ = nx.algorithms.connectivity.connectivity.all_pairs_node_connectivity(G)
38 changes: 38 additions & 0 deletions benchmarks/benchmarks/bench_shortest_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,48 @@
import networkx as nx


class Generic(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_all_shortest_paths(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_all_shortest_paths(G, weight="weight", backend=backend))


class Unweighted(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_shortest_path_length(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
_ = dict(nx.all_pairs_shortest_path_length(G, backend=backend))

def time_all_pairs_shortest_path(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
_ = dict(nx.all_pairs_shortest_path(G, backend=backend))


class Weighted(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_dijkstra(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_dijkstra(G, backend=backend))

def time_all_pairs_dijkstra_path_length(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_dijkstra_path_length(G, backend=backend))

def time_all_pairs_dijkstra_path(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_dijkstra_path(G, backend=backend))

def time_all_pairs_bellman_ford_path_length(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_bellman_ford_path_length(G, backend=backend))

def time_all_pairs_bellman_ford_path(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_bellman_ford_path(G, backend=backend))
Expand Down
2 changes: 2 additions & 0 deletions nx_parallel/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from .bipartite import *
from .centrality import *
from .shortest_paths import *
from .approximation import *
from .connectivity import *

# modules
from .efficiency_measures import *
Expand Down
1 change: 1 addition & 0 deletions nx_parallel/algorithms/approximation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .connectivity import *
71 changes: 71 additions & 0 deletions nx_parallel/algorithms/approximation/connectivity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Parallel implementations of fast approximation for node connectivity"""
import itertools
from joblib import Parallel, delayed
from networkx.algorithms.approximation.connectivity import local_node_connectivity
import nx_parallel as nxp

__all__ = [
"all_pairs_node_connectivity",
]


def all_pairs_node_connectivity(G, nbunch=None, cutoff=None, get_chunks="chunks"):
"""The parallel implementation first divides the a list of all permutation (in case
of directed graphs) and combinations (in case of undirected graphs) of `nbunch`
into chunks and then creates a generator to lazily compute the local node
connectivities for each chunk, and then employs joblib's `Parallel` function to
execute these computations in parallel across all available CPU cores. At the end,
the results are aggregated into a single dictionary and returned.

Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in `list(iter_func(nbunch, 2))` as input and returns
an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of
directed graphs and `combinations` in case of undirected graphs. The default
is to create chunks by slicing the list into `n` chunks, where `n` is the
number of CPU cores, such that size of each chunk is atmost 10, and at least 1.

networkx.all_pairs_node_connectivity : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.approximation.connectivity.all_pairs_node_connectivity.html
"""

if hasattr(G, "graph_object"):
G = G.graph_object

if nbunch is None:
nbunch = G
else:
nbunch = set(nbunch)

directed = G.is_directed()
if directed:
iter_func = itertools.permutations
else:
iter_func = itertools.combinations

all_pairs = {n: {} for n in nbunch}

def _process_pair_chunk(pairs_chunk):
return [
(u, v, local_node_connectivity(G, u, v, cutoff=cutoff))
for u, v in pairs_chunk
]

pairs = list(iter_func(nbunch, 2))
total_cores = nxp.cpu_count()
if get_chunks == "chunks":
num_in_chunk = max(min(len(pairs) // total_cores, 10), 1)
pairs_chunks = nxp.chunks(pairs, num_in_chunk)
else:
pairs_chunks = get_chunks(pairs)

nc_chunk_generator = ( # nc = node connectivity
delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks
)

for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator):
for u, v, k in nc_chunk:
all_pairs[u][v] = k
if not directed:
all_pairs[v][u] = k
return all_pairs
1 change: 1 addition & 0 deletions nx_parallel/algorithms/connectivity/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .connectivity import *
80 changes: 80 additions & 0 deletions nx_parallel/algorithms/connectivity/connectivity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Parallel flow based connectivity algorithms
"""

import itertools
from networkx.algorithms.flow import build_residual_network
from networkx.algorithms.connectivity.utils import build_auxiliary_node_connectivity
from networkx.algorithms.connectivity.connectivity import local_node_connectivity
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = [
"all_pairs_node_connectivity",
]


def all_pairs_node_connectivity(G, nbunch=None, flow_func=None, get_chunks="chunks"):
"""The parallel implementation first divides a list of all permutation (in case
of directed graphs) and combinations (in case of undirected graphs) of `nbunch`
into chunks and then creates a generator to lazily compute the local node
connectivities for each chunk, and then employs joblib's `Parallel` function to
execute these computations in parallel across all available CPU cores. At the end,
the results are aggregated into a single dictionary and returned.

Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in `list(iter_func(nbunch, 2))` as input and returns
an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of
directed graphs and `combinations` in case of undirected graphs. The default
is to create chunks by slicing the list into `n` chunks, where `n` is the
number of CPU cores, such that size of each chunk is atmost 10, and at least 1.

networkx.all_pairs_node_connectivity : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.connectivity.connectivity.all_pairs_node_connectivity.html
"""

if hasattr(G, "graph_object"):
G = G.graph_object

if nbunch is None:
nbunch = G
else:
nbunch = set(nbunch)

directed = G.is_directed()
if directed:
iter_func = itertools.permutations
else:
iter_func = itertools.combinations

all_pairs = {n: {} for n in nbunch}

# Reuse auxiliary digraph and residual network
H = build_auxiliary_node_connectivity(G)
R = build_residual_network(H, "capacity")
kwargs = {"flow_func": flow_func, "auxiliary": H, "residual": R}

def _process_pair_chunk(pairs_chunk):
return [
(u, v, local_node_connectivity(G, u, v, **kwargs)) for u, v in pairs_chunk
]

pairs = list(iter_func(nbunch, 2))
total_cores = nxp.cpu_count()
if get_chunks == "chunks":
num_in_chunk = max(min(len(pairs) // total_cores, 10), 1)
pairs_chunks = nxp.chunks(pairs, num_in_chunk)
else:
pairs_chunks = get_chunks(pairs)

nc_chunk_generator = ( # nc = node connectivity
delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks
)

for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator):
for u, v, k in nc_chunk:
all_pairs[u][v] = k
if not directed:
all_pairs[v][u] = k
return all_pairs
2 changes: 2 additions & 0 deletions nx_parallel/algorithms/shortest_paths/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .generic import *
from .weighted import *
from .unweighted import *
57 changes: 57 additions & 0 deletions nx_parallel/algorithms/shortest_paths/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from networkx.algorithms.shortest_paths.generic import single_source_all_shortest_paths
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = [
"all_pairs_all_shortest_paths",
]


def all_pairs_all_shortest_paths(
G, weight=None, method="dijkstra", get_chunks="chunks"
):
"""The parallel implementation first divides the nodes into chunks and then
creates a generator to lazily compute all shortest paths between all nodes for
each node in `node_chunk`, and then employs joblib's `Parallel` function to
execute these computations in parallel across all available CPU cores.

Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in an iterable of all the nodes as input and returns
an iterable `node_chunks`. The default chunking is done by slicing the
`G.nodes` into `n` chunks, where `n` is the number of CPU cores.

networkx.single_source_all_shortest_paths : https://github.com/networkx/networkx/blob/de85e3fe52879f819e7a7924474fc6be3994e8e4/networkx/algorithms/shortest_paths/generic.py#L606
"""

def _process_node_chunk(node_chunk):
return [
(
n,
dict(
single_source_all_shortest_paths(G, n, weight=weight, method=method)
),
)
for n in node_chunk
]

if hasattr(G, "graph_object"):
G = G.graph_object

nodes = G.nodes
total_cores = nxp.cpu_count()

if get_chunks == "chunks":
num_in_chunk = max(len(nodes) // total_cores, 1)
node_chunks = nxp.chunks(nodes, num_in_chunk)
else:
node_chunks = get_chunks(nodes)

paths_chunk_generator = (
delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks
)

for path_chunk in Parallel(n_jobs=total_cores)(paths_chunk_generator):
for path in path_chunk:
yield path
Loading