Skip to content

Commit

Permalink
Add MG python implementation of Leiden (#3566)
Browse files Browse the repository at this point in the history
This PR adds an MG implementation of Leiden following the PLC path

closes #2489 
closes #2490 
closes #3431

Authors:
  - Joseph Nke (https://github.com/jnke2016)
  - Naim (https://github.com/naimnv)
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Brad Rees (https://github.com/BradReesWork)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #3566
  • Loading branch information
jnke2016 authored May 26, 2023
1 parent 10bfaed commit 5ba3152
Show file tree
Hide file tree
Showing 7 changed files with 401 additions and 41 deletions.
19 changes: 17 additions & 2 deletions python/cugraph/cugraph/community/leiden.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pylibcugraph import louvain as pylibcugraph_leiden
from pylibcugraph import leiden as pylibcugraph_leiden
from pylibcugraph import ResourceHandle
from cugraph.structure import Graph
import cudf
Expand All @@ -31,7 +31,11 @@


def leiden(
G: Union[Graph, "networkx.Graph"], max_iter: int = 100, resolution: float = 1.0
G: Union[Graph, "networkx.Graph"],
max_iter: int = 100,
resolution: float = 1.0,
random_state: int = None,
theta: int = 1.0,
) -> Tuple[cudf.DataFrame, float]:
"""
Compute the modularity optimizing partition of the input graph using the
Expand Down Expand Up @@ -64,6 +68,15 @@ def leiden(
communities, lower resolutions lead to fewer larger communities.
Defaults to 1.
random_state: int, optional(default=None)
Random state to use when generating samples. Optional argument,
defaults to a hash of process id, time, and hostname.
theta: float, optional (default=1.0)
Called theta in the Leiden algorithm, this is used to scale
modularity gain in Leiden refinement phase, to compute
the probability of joining a random leiden community.
Returns
-------
parts : cudf.DataFrame
Expand Down Expand Up @@ -93,9 +106,11 @@ def leiden(

vertex, partition, modularity_score = pylibcugraph_leiden(
resource_handle=ResourceHandle(),
random_state=random_state,
graph=G._plc_graph,
max_level=max_iter,
resolution=resolution,
theta=theta,
do_expensive_check=False,
)

Expand Down
1 change: 1 addition & 0 deletions python/cugraph/cugraph/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
from .link_prediction.jaccard import jaccard
from .link_prediction.sorensen import sorensen
from .link_prediction.overlap import overlap
from .community.leiden import leiden
1 change: 1 addition & 0 deletions python/cugraph/cugraph/dask/community/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
from .louvain import louvain
from .triangle_count import triangle_count
from .induced_subgraph import induced_subgraph
from .leiden import leiden
189 changes: 189 additions & 0 deletions python/cugraph/cugraph/dask/community/leiden.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

from dask.distributed import wait, default_client
import cugraph.dask.comms.comms as Comms
import dask_cudf
import dask
from dask import delayed
import cudf

from pylibcugraph import ResourceHandle
from pylibcugraph import leiden as pylibcugraph_leiden
import numpy
import cupy as cp
from typing import Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from cugraph import Graph


def convert_to_cudf(result: cp.ndarray) -> Tuple[cudf.DataFrame, float]:
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
cupy_vertex, cupy_partition, modularity = result
df = cudf.DataFrame()
df["vertex"] = cupy_vertex
df["partition"] = cupy_partition

return df, modularity


def _call_plc_leiden(
sID: bytes,
mg_graph_x,
max_iter: int,
resolution: int,
random_state: int,
theta: int,
do_expensive_check: bool,
) -> Tuple[cp.ndarray, cp.ndarray, float]:
return pylibcugraph_leiden(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
random_state=random_state,
graph=mg_graph_x,
max_level=max_iter,
resolution=resolution,
theta=theta,
do_expensive_check=do_expensive_check,
)


def leiden(
input_graph: Graph,
max_iter: int = 100,
resolution: int = 1.0,
random_state: int = None,
theta: int = 1.0,
) -> Tuple[dask_cudf.DataFrame, float]:
"""
Compute the modularity optimizing partition of the input graph using the
Leiden method
Traag, V. A., Waltman, L., & van Eck, N. J. (2019). From Louvain to Leiden:
guaranteeing well-connected communities. Scientific reports, 9(1), 5233.
doi: 10.1038/s41598-019-41695-z
Parameters
----------
G : cugraph.Graph
The graph descriptor should contain the connectivity information
and weights. The adjacency list will be computed if not already
present.
The current implementation only supports undirected graphs.
max_iter : integer, optional (default=100)
This controls the maximum number of levels/iterations of the Leiden
algorithm. When specified the algorithm will terminate after no more
than the specified number of iterations. No error occurs when the
algorithm terminates early in this manner.
resolution: float, optional (default=1.0)
Called gamma in the modularity formula, this changes the size
of the communities. Higher resolutions lead to more smaller
communities, lower resolutions lead to fewer larger communities.
Defaults to 1.
random_state: int, optional(default=None)
Random state to use when generating samples. Optional argument,
defaults to a hash of process id, time, and hostname.
theta: float, optional (default=1.0)
Called theta in the Leiden algorithm, this is used to scale
modularity gain in Leiden refinement phase, to compute
the probability of joining a random leiden community.
Returns
-------
parts : dask_cudf.DataFrame
GPU data frame of size V containing two columns the vertex id and the
partition id it is assigned to.
ddf['vertex'] : cudf.Series
Contains the vertex identifiers
ddf['partition'] : cudf.Series
Contains the partition assigned to the vertices
modularity_score : float
a floating point number containing the global modularity score of the
partitioning.
Examples
--------
>>> from cugraph.experimental.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts, modularity_score = cugraph.leiden(G)
"""

if input_graph.is_directed():
raise ValueError("input graph must be undirected")

# Return a client if one has started
client = default_client()

do_expensive_check = False

result = [
client.submit(
_call_plc_leiden,
Comms.get_session_id(),
input_graph._plc_graph[w],
max_iter,
resolution,
random_state,
theta,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
]

wait(result)

part_mod_score = [client.submit(convert_to_cudf, r) for r in result]
wait(part_mod_score)

vertex_dtype = input_graph.edgelist.edgelist_df.dtypes[0]
empty_df = cudf.DataFrame(
{
"vertex": numpy.empty(shape=0, dtype=vertex_dtype),
"partition": numpy.empty(shape=0, dtype="int32"),
}
)

part_mod_score = [delayed(lambda x: x, nout=2)(r) for r in part_mod_score]

ddf = dask_cudf.from_delayed(
[r[0] for r in part_mod_score], meta=empty_df, verify_meta=False
).persist()

mod_score = dask.array.from_delayed(
part_mod_score[0][1], shape=(1,), dtype=float
).compute()

wait(ddf)
wait(mod_score)

wait([r.release() for r in part_mod_score])

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "vertex")

return ddf, mod_score
67 changes: 40 additions & 27 deletions python/cugraph/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,40 @@
# limitations under the License.
#

from __future__ import annotations

from dask.distributed import wait, default_client
import cugraph.dask.comms.comms as Comms
import dask_cudf
import dask
from dask import delayed
import cudf
import operator as op
import cupy as cp
import numpy

from pylibcugraph import ResourceHandle
from pylibcugraph import louvain as pylibcugraph_louvain
from typing import Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from cugraph import Graph


def convert_to_cudf(cupy_vertex, cupy_partition):
def convert_to_cudf(result: cp.ndarray) -> Tuple[cudf.DataFrame, float]:
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
cupy_vertex, cupy_partition, modularity = result
df = cudf.DataFrame()
df["vertex"] = cupy_vertex
df["partition"] = cupy_partition

return df
return df, modularity


def _call_plc_louvain(sID, mg_graph_x, max_iter, resolution, do_expensive_check):
def _call_plc_louvain(
sID: bytes, mg_graph_x, max_iter: int, resolution: int, do_expensive_check: bool
) -> Tuple[cp.ndarray, cp.ndarray, float]:
return pylibcugraph_louvain(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
graph=mg_graph_x,
Expand All @@ -44,7 +56,9 @@ def _call_plc_louvain(sID, mg_graph_x, max_iter, resolution, do_expensive_check)
)


def louvain(input_graph, max_iter=100, resolution=1.0):
def louvain(
input_graph: Graph, max_iter: int = 100, resolution: int = 1.0
) -> Tuple[dask_cudf.DataFrame, float]:
"""
Compute the modularity optimizing partition of the input graph using the
Louvain method
Expand Down Expand Up @@ -122,32 +136,31 @@ def louvain(input_graph, max_iter=100, resolution=1.0):

wait(result)

# futures is a list of Futures containing tuples of (DataFrame, mod_score),
# unpack using separate calls to client.submit with a callable to get
# individual items.
# FIXME: look into an alternate way (not returning a tuples, accessing
# tuples differently, etc.) since multiple client.submit() calls may not be
# optimal.
result_vertex = [client.submit(op.getitem, f, 0) for f in result]
result_partition = [client.submit(op.getitem, f, 1) for f in result]
mod_score = [client.submit(op.getitem, f, 2) for f in result]

cudf_result = [
client.submit(convert_to_cudf, cp_vertex_arrays, cp_partition_arrays)
for cp_vertex_arrays, cp_partition_arrays in zip(
result_vertex, result_partition
)
]
part_mod_score = [client.submit(convert_to_cudf, r) for r in result]
wait(part_mod_score)

vertex_dtype = input_graph.edgelist.edgelist_df.dtypes[0]
empty_df = cudf.DataFrame(
{
"vertex": numpy.empty(shape=0, dtype=vertex_dtype),
"partition": numpy.empty(shape=0, dtype="int32"),
}
)

part_mod_score = [delayed(lambda x: x, nout=2)(r) for r in part_mod_score]

ddf = dask_cudf.from_delayed(
[r[0] for r in part_mod_score], meta=empty_df, verify_meta=False
).persist()

wait(cudf_result)
# Each worker should have computed the same mod_score
mod_score = mod_score[0].result()
mod_score = dask.array.from_delayed(
part_mod_score[0][1], shape=(1,), dtype=float
).compute()

ddf = dask_cudf.from_delayed(cudf_result).persist()
wait(ddf)
wait(mod_score)

# Wait until the inactive futures are released
wait([(r.release(), c_r.release()) for r, c_r in zip(result, cudf_result)])
wait([r.release() for r in part_mod_score])

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "vertex")
Expand Down
Loading

0 comments on commit 5ba3152

Please sign in to comment.