diff --git a/README.md b/README.md index 44aeb931..a27cfa6f 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,8 @@ nx-parallel is a NetworkX backend that uses joblib for parallelization. This pro - [tournament_is_strongly_connected](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L54) - [all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/connectivity/connectivity.py#L17) - [approximate_all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/approximation/connectivity.py#L12) -- [betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L16) +- [betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19) +- [edge_betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L94) - [node_redundancy](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/redundancy.py#L11) - [all_pairs_dijkstra](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L28) - [all_pairs_dijkstra_path_length](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L71) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 30648f3d..af79e551 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -67,12 +67,19 @@ def get_info(): }, }, "betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L16", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the number of CPU cores." }, }, + "edge_betweenness_centrality": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L94", + "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", + "additional_parameters": { + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n` chunks, where `n` is the number of CPU cores." + }, + }, "node_redundancy": { "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/redundancy.py#L11", "additional_docs": "In the parallel implementation we divide the nodes into chunks and compute the node redundancy coefficients for all `node_chunk` in parallel.", @@ -143,5 +150,23 @@ def get_info(): 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores." }, }, + "chunks": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L8", + "additional_docs": "Divides an iterable into chunks of size n", + "additional_parameters": None, + }, + "cpu_count": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L18", + "additional_docs": "Returns the number of logical CPUs or cores", + "additional_parameters": None, + }, + "create_iterables": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L26", + "additional_docs": "Creates an iterable of function inputs for parallel computation based on the provided iterator type.", + "additional_parameters": { + "G : NetworkX graph": "iterator : str Type of iterator. Valid values are 'node', 'edge', 'isolate'", + "iterable : Iterable": "An iterable of function inputs.", + }, + }, }, } diff --git a/benchmarks/benchmarks/bench_centrality.py b/benchmarks/benchmarks/bench_centrality.py index 24140de7..74cba68e 100644 --- a/benchmarks/benchmarks/bench_centrality.py +++ b/benchmarks/benchmarks/bench_centrality.py @@ -15,3 +15,7 @@ class Betweenness(Benchmark): def time_betweenness_centrality(self, backend, num_nodes, edge_prob): G = get_cached_gnp_random_graph(num_nodes, edge_prob) _ = nx.betweenness_centrality(G, backend=backend) + + def time_edge_betweenness_centrality(self, backend, num_nodes, edge_prob): + G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True) + _ = nx.edge_betweenness_centrality(G, backend=backend) diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 1296700f..29765796 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -5,11 +5,14 @@ _rescale, _single_source_dijkstra_path_basic, _single_source_shortest_path_basic, + _rescale_e, + _add_edge_keys, + _accumulate_edges, ) from networkx.utils import py_random_state import nx_parallel as nxp -__all__ = ["betweenness_centrality"] +__all__ = ["betweenness_centrality", "edge_betweenness_centrality"] @py_random_state(5) @@ -85,3 +88,71 @@ def _betweenness_centrality_node_subset(G, nodes, weight=None, endpoints=False): else: betweenness, delta = _accumulate_basic(betweenness, S, P, sigma, s) return betweenness + + +@py_random_state(4) +def edge_betweenness_centrality( + G, k=None, normalized=True, weight=None, seed=None, get_chunks="chunks" +): + """The parallel computation is implemented by dividing the nodes into chunks and + computing edge betweenness centrality for each chunk concurrently. + + networkx.edge_betweenness_centrality : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.centrality.edge_betweenness_centrality.html + + Parameters + ---------- + get_chunks : str, function (default = "chunks") + A function that takes in a list of all the nodes as input and returns an + iterable `node_chunks`. The default chunking is done by slicing the + `nodes` into `n` chunks, where `n` is the number of CPU cores. + """ + if hasattr(G, "graph_object"): + G = G.graph_object + + if k is None: + nodes = G.nodes + else: + nodes = seed.sample(list(G.nodes), k) + + total_cores = nxp.cpu_count() + + if get_chunks == "chunks": + node_chunks = nxp.create_iterables(G, "node", total_cores, nodes) + else: + node_chunks = get_chunks(nodes) + + bt_cs = Parallel(n_jobs=total_cores)( + delayed(_edge_betweenness_centrality_node_subset)(G, chunk, weight) + for chunk in node_chunks + ) + + # Reducing partial solution + bt_c = bt_cs[0] + for bt in bt_cs[1:]: + for e in bt: + bt_c[e] += bt[e] + + for n in G: # remove nodes to only return edges + del bt_c[n] + + betweenness = _rescale_e(bt_c, len(G), normalized=normalized, k=k) + + if G.is_multigraph(): + betweenness = _add_edge_keys(G, betweenness, weight=weight) + + return betweenness + + +def _edge_betweenness_centrality_node_subset(G, nodes, weight=None): + betweenness = dict.fromkeys(G, 0.0) # b[v]=0 for v in G + # b[e]=0 for e in G.edges() + betweenness.update(dict.fromkeys(G.edges(), 0.0)) + for s in nodes: + # single source shortest paths + if weight is None: # use BFS + S, P, sigma, _ = _single_source_shortest_path_basic(G, s) + else: # use Dijkstra's algorithm + S, P, sigma, _ = _single_source_dijkstra_path_basic(G, s, weight) + # accumulation + betweenness = _accumulate_edges(betweenness, S, P, sigma, s) + return betweenness diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 8296b031..0c409731 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -1,5 +1,8 @@ from nx_parallel.algorithms.bipartite.redundancy import node_redundancy -from nx_parallel.algorithms.centrality.betweenness import betweenness_centrality +from nx_parallel.algorithms.centrality.betweenness import ( + betweenness_centrality, + edge_betweenness_centrality, +) from nx_parallel.algorithms.shortest_paths.generic import all_pairs_all_shortest_paths from nx_parallel.algorithms.shortest_paths.weighted import ( all_pairs_dijkstra, @@ -74,6 +77,7 @@ class BackendInterface: # Centrality betweenness_centrality = betweenness_centrality + edge_betweenness_centrality = edge_betweenness_centrality # Efficiency local_efficiency = local_efficiency diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index 3a3b9f14..baaacb83 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -51,6 +51,7 @@ def random_chunking(nodes): ] chk_dict_vals = [ "betweenness_centrality", + "edge_betweenness_centrality", ] G = nx.fast_gnp_random_graph(50, 0.6, seed=42) H = nxp.ParallelGraph(G)