Skip to content

Commit

Permalink
Merge branch 'main' into shortest_paths
Browse files Browse the repository at this point in the history
  • Loading branch information
dschult committed Mar 11, 2024
2 parents 34ef348 + ea65283 commit 07db6dc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions nx_parallel/algorithms/shortest_paths/weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,27 +206,47 @@ def _process_node_chunk(node_chunk):
yield path_length


def all_pairs_bellman_ford_path(G, weight="weight"):
"""The parallel computation is implemented by computing the
shortest paths for each node concurrently.
def all_pairs_bellman_ford_path(G, weight="weight", get_chunks="chunks"):
"""The parallel implementation first divides the nodes into chunks and then
creates a generator to lazily compute shortest paths for each node_chunk, and
then employs joblib's `Parallel` function to execute these computations in
parallel across all available CPU cores.
Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in an iterable of all the nodes as input and returns
an iterable `node_chunks`. The default chunking is done by slicing the
`G.nodes` into `n` chunks, where `n` is the number of CPU cores.
networkx.all_pairs_bellman_ford_path : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_bellman_ford_path.html#all-pairs-bellman-ford-path
"""

def _calculate_shortest_paths_subset(source):
return (source, single_source_bellman_ford_path(G, source, weight=weight))
def _process_node_chunk(node_chunk):
return [
(node, single_source_bellman_ford_path(G, node, weight=weight))
for node in node_chunk
]

if hasattr(G, "graph_object"):
G = G.graph_object

cpu_count = nxp.cpu_count()

nodes = G.nodes
total_cores = nxp.cpu_count()

if get_chunks == "chunks":
num_in_chunk = max(len(nodes) // total_cores, 1)
node_chunks = nxp.chunks(nodes, num_in_chunk)
else:
node_chunks = get_chunks(nodes)

paths = Parallel(n_jobs=cpu_count, return_as="generator")(
delayed(_calculate_shortest_paths_subset)(source) for source in nodes
paths_chunk_generator = (
delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks
)
return paths

for path_chunk in Parallel(n_jobs=nxp.cpu_count())(paths_chunk_generator):
for path in path_chunk:
yield path


def johnson(G, weight="weight", get_chunks="chunks"):
Expand Down
Binary file modified timing/heatmap_all_pairs_bellman_ford_path_timing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 07db6dc

Please sign in to comment.