diff --git a/nx_parallel/algorithms/centrality/__init__.py b/nx_parallel/algorithms/centrality/__init__.py index cf7adb6..6d06d73 100644 --- a/nx_parallel/algorithms/centrality/__init__.py +++ b/nx_parallel/algorithms/centrality/__init__.py @@ -1 +1,2 @@ from .betweenness import * +from .reaching import * diff --git a/nx_parallel/algorithms/centrality/reaching.py b/nx_parallel/algorithms/centrality/reaching.py new file mode 100644 index 0000000..ced01a1 --- /dev/null +++ b/nx_parallel/algorithms/centrality/reaching.py @@ -0,0 +1,80 @@ +"""Parallel functions for computing reaching centrality of a node or a graph.""" + +import networkx as nx +from joblib import Parallel, delayed +import nx_parallel as nxp +from networkx.algorithms.centrality.betweenness import _average_weight + +__all__ = ["global_reaching_centrality", "local_reaching_centrality"] + + +def global_reaching_centrality(G, weight=None, normalized=True): + """Returns the global reaching centrality of a directed graph.""" + + if hasattr(G, "graph_object"): + G = G.graph_object + + if nx.is_negatively_weighted(G, weight=weight): + raise nx.NetworkXError("edge weights must be positive") + total_weight = G.size(weight=weight) + if total_weight <= 0: + raise nx.NetworkXError("Size of G must be positive") + + if weight is not None: + + def as_distance(u, v, d): + return total_weight / d.get(weight, 1) + + shortest_paths = nx.shortest_path(G, weight=as_distance) + else: + shortest_paths = nx.shortest_path(G) + + centrality = local_reaching_centrality + total_cores = nxp.cpu_count() + lrc = [ + Parallel(n_jobs=total_cores)( + delayed(centrality)( + G, node, paths=paths, weight=weight, normalized=normalized + ) + for node, paths in shortest_paths.items() + ) + ] + + max_lrc = max(lrc) + return sum(max_lrc - c for c in lrc) / (len(G) - 1) + + +def local_reaching_centrality(G, v, paths=None, weight=None, normalized=True): + """Returns the local reaching centrality of a node in a directed graph.""" + + if hasattr(G, "graph_object"): + G = G.graph_object + + if paths is None: + if nx.is_negatively_weighted(G, weight=weight): + raise nx.NetworkXError("edge weights must be positive") + total_weight = G.size(weight=weight) + if total_weight <= 0: + raise nx.NetworkXError("Size of G must be positive") + if weight is not None: + # Interpret weights as lengths. + def as_distance(u, v, d): + return total_weight / d.get(weight, 1) + + paths = nx.shortest_path(G, source=v, weight=as_distance) + else: + paths = nx.shortest_path(G, source=v) + # If the graph is unweighted, simply return the proportion of nodes + # reachable from the source node ``v``. + if weight is None and G.is_directed(): + return (len(paths) - 1) / (len(G) - 1) + if normalized and weight is not None: + norm = G.size(weight=weight) / G.size() + else: + norm = 1 + total_cores = nxp.cpu_count() + avgw = Parallel(n_jobs=total_cores)( + delayed(_average_weight)(G, path, weight=weight) for path in paths.values() + ) + sum_avg_weight = sum(avgw) / norm + return sum_avg_weight / (len(G) - 1) diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index d492029..19d29c8 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -1,4 +1,8 @@ from nx_parallel.algorithms.centrality.betweenness import betweenness_centrality +from nx_parallel.algorithms.centrality.reaching import ( + global_reaching_centrality, + local_reaching_centrality, +) from nx_parallel.algorithms.shortest_paths.weighted import all_pairs_bellman_ford_path from nx_parallel.algorithms.efficiency_measures import local_efficiency from nx_parallel.algorithms.isolate import number_of_isolates @@ -42,6 +46,8 @@ class Dispatcher: # Centrality betweenness_centrality = betweenness_centrality + global_reaching_centrality = global_reaching_centrality + local_reaching_centrality = local_reaching_centrality # Efficiency local_efficiency = local_efficiency