-
-
Notifications
You must be signed in to change notification settings - Fork 22
/
unweighted.py
103 lines (81 loc) · 3.54 KB
/
unweighted.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
Shortest path parallel algorithms for unweighted graphs.
"""
from joblib import Parallel, delayed
import nx_parallel as nxp
from networkx.algorithms.shortest_paths.unweighted import (
single_source_shortest_path_length,
single_source_shortest_path,
)
__all__ = [
"all_pairs_shortest_path",
"all_pairs_shortest_path_length",
]
@nxp._configure_if_nx_active()
def all_pairs_shortest_path_length(G, cutoff=None, get_chunks="chunks"):
"""The parallel implementation first divides the nodes into chunks and then
creates a generator to lazily compute shortest paths lengths for each node in
`node_chunk`, and then employs joblib's `Parallel` function to execute these
computations in parallel across `n_jobs` number of CPU cores.
networkx.single_source_shortest_path_length : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.unweighted.all_pairs_shortest_path_length.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in an iterable of all the nodes as input and returns
an iterable `node_chunks`. The default chunking is done by slicing the
`G.nodes` into `n_jobs` number of chunks.
"""
def _process_node_chunk(node_chunk):
return [
(node, single_source_shortest_path_length(G, node, cutoff=cutoff))
for node in node_chunk
]
if hasattr(G, "graph_object"):
G = G.graph_object
nodes = G.nodes
n_jobs = nxp.get_n_jobs()
if get_chunks == "chunks":
num_in_chunk = max(len(nodes) // n_jobs, 1)
node_chunks = nxp.chunks(nodes, num_in_chunk)
else:
node_chunks = get_chunks(nodes)
path_lengths_chunk_generator = (
delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks
)
for path_length_chunk in Parallel()(path_lengths_chunk_generator):
for path_length in path_length_chunk:
yield path_length
@nxp._configure_if_nx_active()
def all_pairs_shortest_path(G, cutoff=None, get_chunks="chunks"):
"""The parallel implementation first divides the nodes into chunks and then
creates a generator to lazily compute shortest paths for each `node_chunk`, and
then employs joblib's `Parallel` function to execute these computations in
parallel across `n_jobs` number of CPU cores.
networkx.single_source_shortest_path : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.unweighted.all_pairs_shortest_path.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in an iterable of all the nodes as input and returns
an iterable `node_chunks`. The default chunking is done by slicing the
`G.nodes` into `n_jobs` number of chunks.
"""
def _process_node_chunk(node_chunk):
return [
(node, single_source_shortest_path(G, node, cutoff=cutoff))
for node in node_chunk
]
if hasattr(G, "graph_object"):
G = G.graph_object
nodes = G.nodes
n_jobs = nxp.get_n_jobs()
if get_chunks == "chunks":
num_in_chunk = max(len(nodes) // n_jobs, 1)
node_chunks = nxp.chunks(nodes, num_in_chunk)
else:
node_chunks = get_chunks(nodes)
paths_chunk_generator = (
delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks
)
for path_chunk in Parallel()(paths_chunk_generator):
for path in path_chunk:
yield path