Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MG python implementation of Leiden #3566

Merged
merged 186 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
186 commits
Select commit Hold shift + click to select a range
7903391
Renames variables
Sep 5, 2022
063821a
intermediate commit: Primitives call to compute some components of Le…
Oct 12, 2022
6791a65
Adds leiden to louvain mapping.
Oct 18, 2022
1a043be
Adds leiden to louvain mapping.
Oct 20, 2022
fa086ab
Returns leiden to louvain mapping from refined partition
Oct 20, 2022
b461d0b
Return renumbering map from graph_contraction
Oct 21, 2022
4ac3f47
Preserve original louvain partition of refined graph
Oct 25, 2022
40a99f4
Merge branch 'branch-22.12' of github.com:rapidsai/cugraph into leide…
Oct 25, 2022
2b4f37d
Fix errors that resulted in merging new changes from 22.12
Oct 28, 2022
46db0ca
Add view_concat for edge_minor_property_view_t, change transform_redu…
Nov 2, 2022
8ef174b
Isolate refine_clustering in a separate file to make compilation fast…
Nov 3, 2022
43b6605
Merge branch 'branch-22.12' of github.com:rapidsai/cugraph into leide…
Nov 16, 2022
4f5d3ea
Use per_v_transform_reduce_dst_key_aggregated_outgoing_e with kv_stor…
Nov 17, 2022
5057a83
Merge branch 'rapidsai:branch-22.12' into leiden-on-22-12
naimnv Nov 18, 2022
ec765b8
MIS to find a non-conflicting set of moves
Nov 22, 2022
90b8371
Fix merge issues
Nov 25, 2022
4c3875f
Fix merge issues
Nov 25, 2022
ba9233e
Merge branch 'leiden-on-22-12' of github.com:naimnv/cugraph-forked in…
Nov 25, 2022
b436333
Removes duplicate code and unused functors
Nov 29, 2022
fb4f080
Merge branch 'rapidsai:branch-22.12' into leiden-on-22-12
naimnv Nov 29, 2022
040f40e
Removes duplicate code
Dec 5, 2022
4770fe1
Renames function
Dec 5, 2022
6733030
Rename function
Dec 5, 2022
18cfa9c
Update leiden assignment according to MIS chosen moves
Dec 6, 2022
60d4c67
Change break condition to compute MIS and assign louvain partition to…
Dec 29, 2022
865df23
Adds MG instances of refine, mis and leiden, fixes import issues
Jan 6, 2023
346b60c
Adds missing SG MG specialization for lookup_primitive_values_for_key…
Jan 7, 2023
a8135b9
Fixes mis implementation
Jan 8, 2023
16fb1fc
Use env local nvcc and specific version of gcc/g++
Jan 11, 2023
8440d17
Initial implementation of the Leiden C API
ChuckHastings Jan 20, 2023
1f638ba
Debug problem related to weight change
Feb 2, 2023
7fb7e10
Debug problem related to weight change
Feb 3, 2023
5cefd78
Debug problem related to weight change
Feb 3, 2023
f1f5c95
Debug problem related to weight change
Feb 3, 2023
b37b669
Debug problem related to weight change
Feb 3, 2023
ce09b7c
Debug problem related to weight change
Feb 5, 2023
45a295a
Debug problem related to weight change
Feb 5, 2023
a648f8e
Debug problem related to weight change
Feb 9, 2023
7a130f1
Merge branch 'branch-23.04' into define_leiden_c_api
ChuckHastings Feb 9, 2023
87b9a5f
pre-commit
ChuckHastings Feb 9, 2023
1f28045
Debug problem related to MIS
Feb 17, 2023
4455c3f
Debug problem related to MIS/refine loop
Feb 17, 2023
449628c
Debug problem related to MIS/refine loop
Feb 19, 2023
e9aefcc
Debug problem related to MIS/refine loop
Feb 21, 2023
73225a8
TODO: MIS implementaion needs to be rechecked
Feb 28, 2023
fae443e
TODO: MIS implementaion needs to be rechecked
Dec 5, 2022
02575df
Merge Leiden with 23.04
Mar 3, 2023
b501da9
Fix merge issues
Mar 4, 2023
f17dbb7
TODO: Fix MIS implementaion
Mar 6, 2023
6206c0a
Merge branch 'leiden-on-23.04' into leiden-on-22-12
Mar 7, 2023
baba5c7
Fix merge issue
Mar 7, 2023
0aa06e4
Re-implement MIS
Mar 8, 2023
f8b3633
Cleanup MIS code
Mar 9, 2023
6b42b71
TODO: Check MIS implementation
Mar 9, 2023
9768abe
TODO: Check MIS implementation
Mar 9, 2023
fa91923
TODO: Check MIS implementation
Mar 9, 2023
fbd0120
TODO: Check MIS implementation
Mar 9, 2023
b56362b
TODO: Check MIS implementation
Mar 9, 2023
3246d4e
TODO: Check MIS implementation
Mar 9, 2023
5d57768
TODO: Check MIS implementation
Mar 9, 2023
84b9947
TODO: Check MIS implementation
Mar 10, 2023
9baaa88
Change fraction of vertices slected in each iteration of MIS loop
Mar 10, 2023
632af8b
remove unused import
jnke2016 Mar 12, 2023
50f0442
remove unused import
jnke2016 Mar 12, 2023
ae5e6c0
Merge remote-tracking branch 'upstream/define_leiden_c_api' into bran…
jnke2016 Mar 13, 2023
3b6ab3e
add plc implementation of Leiden
jnke2016 Mar 13, 2023
ae22cdc
add python implementation of Leiden
jnke2016 Mar 13, 2023
b6b20d4
fix style
jnke2016 Mar 13, 2023
fa41d51
Debug max reduction
Mar 13, 2023
7f90281
Merge branch 'branch-23.04' of github.com:rapidsai/cugraph into leide…
Mar 13, 2023
ec7bedd
Debug max reduction
Mar 13, 2023
562022b
Debug MIS with int ranks
Mar 15, 2023
c554705
Debug MIS with int ranks
Mar 15, 2023
6699a8d
Debug max reduction
Mar 15, 2023
d57d62a
Debug MIS with int ranks
Mar 15, 2023
6318b3d
With a working version of MIS
Mar 15, 2023
3582131
fetch latest changes
jnke2016 Mar 17, 2023
8950e1a
drop mg leiden
jnke2016 Mar 17, 2023
11f8329
fix style
jnke2016 Mar 17, 2023
c262b1d
fix style
jnke2016 Mar 17, 2023
28f1471
clean code to make a PR
Mar 16, 2023
10be7a7
Resize and shrink device vectors in refine.cuh
Mar 26, 2023
7ca5696
Resize and shrink device vectors in leiden_impl.cuh
Mar 26, 2023
44660c8
Remove debug statements
Mar 26, 2023
4dc7ffe
Remove unused variable
Mar 27, 2023
9f1ed0c
Change variable name, modify comments, remove unused file
Mar 27, 2023
ad33955
Fix copyright for Leiden and maximal independent set
Mar 27, 2023
5baa124
Merge branch 'branch-23.04' of github.com:rapidsai/cugraph into leide…
Mar 27, 2023
e5f8aca
Fix Leiden PR issues
Mar 28, 2023
3e4fcb2
Pass value for missing parameter #2980
Mar 28, 2023
5074360
Merge branch 'branch-23.04' of github.com:rapidsai/cugraph into leide…
Mar 28, 2023
1b266c2
Merge branch 'branch-23.04' of github.com:rapidsai/cugraph into leide…
Mar 28, 2023
3b6f81c
Merge branch 'leiden-on-22-12' of github.com:naimnv/cugraph-forked in…
Mar 28, 2023
35f3472
Merge branch 'branch-23.04' of github.com:rapidsai/cugraph into leide…
Mar 28, 2023
ae455ab
Add missing placeholer return value #2980
Mar 28, 2023
52d15cb
Add missing template parameter to function call #2980
Mar 28, 2023
868b113
Merge remote-tracking branch 'upstream/branch-23.04' into branch-23.0…
Mar 29, 2023
8afcd99
Merge remote-tracking branch 'upstream/leiden-on-22-12' into branch-2…
Mar 29, 2023
1ea3f62
remove legacy 'leiden' and replace it with the one leveraging the CAPI
Mar 29, 2023
fbcb767
fix typo
Mar 29, 2023
5b2bc92
add more tests
Mar 29, 2023
0315c50
remove temporary import
Mar 29, 2023
b910b4d
enable leiden in C API
ChuckHastings Mar 30, 2023
82c49e7
update leiden test results
Mar 30, 2023
c875bba
update test
Mar 30, 2023
38277dc
return original results if renumbering did not occur
Mar 30, 2023
a9418ea
fix style
Mar 30, 2023
41c7137
Merge remote-tracking branch 'upstream/branch-23.04' into branch-23.0…
Mar 30, 2023
cbf26b0
fix style
Mar 30, 2023
57c55cb
fix copyright
Mar 30, 2023
31e6adf
Unit test MG MIS
Apr 25, 2023
ca17c54
fetch latest change
Apr 26, 2023
e2b7560
fetch latest change
Apr 26, 2023
4caab42
fetch latest change
Apr 26, 2023
696d51c
update type annotation
Apr 26, 2023
e22fa87
update tests
Apr 26, 2023
dd0a286
fix style
Apr 26, 2023
58c36ea
Merge branch 'branch-23.06' of github.com:rapidsai/cugraph into mnmg_mis
Apr 26, 2023
b4e93ad
Add MG leiden test structure
May 5, 2023
6ccc2ba
Merge branch 'branch-23.06' of github.com:rapidsai/cugraph into mnmg_mis
May 5, 2023
4ad2275
collect_values_for_keys test
May 8, 2023
0f44c46
collect_values_for_keys test
May 8, 2023
e207d0b
update docstrings
May 12, 2023
6070c7a
fix style
May 12, 2023
4bd6d38
fix typo
May 12, 2023
dde35ea
Merge remote-tracking branch 'upstream/branch-23.06' into branch-23.0…
May 12, 2023
7febd79
Merge remote-tracking branch 'upstream/branch-23.06' into branch-23.0…
May 15, 2023
1fd04e5
Debug branch for MG Leiden
May 12, 2023
98bf51a
Clean mis, leiden mg test
May 16, 2023
15cb8c8
Merge remote-tracking branch 'upstream/branch-23.06' into branch-23.0…
May 16, 2023
802548c
Merge remote-tracking branch 'upstream/branch-23.06' into branch-23.0…
May 16, 2023
797823d
add mg implementation of leiden
May 16, 2023
e53df38
add MG Leiden tests
May 16, 2023
b5fabb8
Merge remote-tracking branch 'upstream/branch-23.04_fea-plc_leiden' i…
May 16, 2023
4e730e0
Clean mis, leiden mg test, debug mg leiden with random moves
May 18, 2023
7c610e9
MG Leiden and MG MIS
May 18, 2023
582eeb3
MG Leiden, MG MIS
May 18, 2023
528fd9f
MG Leiden, MG MIS
May 18, 2023
8b90bd5
Copyright fix
May 18, 2023
4471aef
Merge remote-tracking branch 'upstream/mnmg_mis' into branch-23.06_fe…
May 18, 2023
6764476
Removes debug stuffs from refine_impl.cuh
May 18, 2023
9809f42
Removes tests using local files
May 18, 2023
318357b
Remove print statement
May 18, 2023
86074ef
Remove test with lcoal file
May 18, 2023
ab7ad07
Address some PR comments
May 18, 2023
6c9aedb
Use thrust random generator inside device code
May 19, 2023
cfca807
Use thrust random generator inside device code
May 19, 2023
3d52822
enable mg leiden in the CAPI
May 19, 2023
2769233
add mg leiden
May 19, 2023
a3128a6
Use raft random gernerator instead of thrust
May 19, 2023
8e2ffe5
Add RNG state parameter to Leiden
May 19, 2023
f3356b1
Add doc string for theta
May 19, 2023
9e59a1d
Change pylibcugraph api due to chagne in c-api
May 20, 2023
810d2f4
Pull upstream changes
May 20, 2023
575bde4
Rename compute_mis to maximal_independent_set
May 20, 2023
ee7ad8a
style fix
May 20, 2023
67b33ae
Replace for_each with transform_if
May 20, 2023
6a630fa
Make the cluster ids consecutive
May 20, 2023
7a3019e
Update c_api code for leiden
May 21, 2023
e2c33ae
update future extraction
May 21, 2023
fea752d
remove outdated comment
May 21, 2023
2a0f857
update type annotation
May 21, 2023
68080b1
remove unsued import
May 21, 2023
9071f00
Merge remote-tracking branch 'upstream/mnmg_mis' into branch-23.06_fe…
May 22, 2023
c3fa04d
update plc leiden call at the python layer as it now support a random…
May 22, 2023
1f36df7
update docstrings
May 22, 2023
ca408c5
update dask 'future' extraction and add type annotation
May 22, 2023
972c514
remove unused import
May 22, 2023
a9ebb2c
fix style
May 22, 2023
e0f1217
Fix sg rng_state initialization
May 22, 2023
788517e
Fix mg leiden c-api test
May 22, 2023
b12b619
Rename sg leiden cpp test and cosmetic fix for CMakeLists.txt
May 22, 2023
afbe156
Expose theta to c and python api
May 22, 2023
22f2df6
Merge remote-tracking branch 'upstream/mnmg_mis' into branch-23.06_fe…
May 23, 2023
4737002
Merge remote-tracking branch 'upstream/branch-23.06' into branch-23.0…
May 23, 2023
837870f
fix circular import error
May 23, 2023
2b0fb14
Merge remote-tracking branch 'upstream/branch-23.06' into branch-23.0…
May 23, 2023
25a5c11
update docstrings
May 24, 2023
c2c6a04
fix style
May 24, 2023
14f04eb
update leiden tests
May 24, 2023
39a87eb
update leiden API
May 24, 2023
f6c303b
add comments
May 24, 2023
b09f410
fix style
May 24, 2023
f49c423
Merge remote-tracking branch 'upstream/branch-23.06' into branch-23.0…
May 24, 2023
aae8fb7
remove unused code
May 24, 2023
1d98ee3
Merge branch 'branch-23.06' into branch-23.06_fea-mg_leiden
BradReesWork May 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions python/cugraph/cugraph/community/leiden.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pylibcugraph import louvain as pylibcugraph_leiden
from pylibcugraph import leiden as pylibcugraph_leiden
from pylibcugraph import ResourceHandle
from cugraph.structure import Graph
import cudf
Expand All @@ -31,7 +31,11 @@


def leiden(
G: Union[Graph, "networkx.Graph"], max_iter: int = 100, resolution: float = 1.0
G: Union[Graph, "networkx.Graph"],
max_iter: int = 100,
resolution: float = 1.0,
random_state: int = None,
theta: int = 1.0,
) -> Tuple[cudf.DataFrame, float]:
"""
Compute the modularity optimizing partition of the input graph using the
Expand Down Expand Up @@ -64,6 +68,15 @@ def leiden(
communities, lower resolutions lead to fewer larger communities.
Defaults to 1.

random_state: int, optional(default=None)
Random state to use when generating samples. Optional argument,
defaults to a hash of process id, time, and hostname.

theta: float, optional (default=1.0)
Called theta in the Leiden algorithm, this is used to scale
modularity gain in Leiden refinement phase, to compute
the probability of joining a random leiden community.

Returns
-------
parts : cudf.DataFrame
Expand Down Expand Up @@ -93,9 +106,11 @@ def leiden(

vertex, partition, modularity_score = pylibcugraph_leiden(
resource_handle=ResourceHandle(),
random_state=random_state,
graph=G._plc_graph,
max_level=max_iter,
resolution=resolution,
theta=theta,
do_expensive_check=False,
)

Expand Down
1 change: 1 addition & 0 deletions python/cugraph/cugraph/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
from .link_prediction.jaccard import jaccard
from .link_prediction.sorensen import sorensen
from .link_prediction.overlap import overlap
from .community.leiden import leiden
1 change: 1 addition & 0 deletions python/cugraph/cugraph/dask/community/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
from .louvain import louvain
from .triangle_count import triangle_count
from .induced_subgraph import induced_subgraph
from .leiden import leiden
189 changes: 189 additions & 0 deletions python/cugraph/cugraph/dask/community/leiden.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

from dask.distributed import wait, default_client
import cugraph.dask.comms.comms as Comms
import dask_cudf
import dask
from dask import delayed
import cudf

from pylibcugraph import ResourceHandle
from pylibcugraph import leiden as pylibcugraph_leiden
import numpy
import cupy as cp
from typing import Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from cugraph import Graph


def convert_to_cudf(result: cp.ndarray) -> Tuple[cudf.DataFrame, float]:
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
cupy_vertex, cupy_partition, modularity = result
df = cudf.DataFrame()
df["vertex"] = cupy_vertex
df["partition"] = cupy_partition

return df, modularity


def _call_plc_leiden(
sID: bytes,
mg_graph_x,
max_iter: int,
resolution: int,
random_state: int,
theta: int,
do_expensive_check: bool,
) -> Tuple[cp.ndarray, cp.ndarray, float]:
return pylibcugraph_leiden(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
random_state=random_state,
graph=mg_graph_x,
max_level=max_iter,
resolution=resolution,
theta=theta,
do_expensive_check=do_expensive_check,
)


def leiden(
input_graph: Graph,
max_iter: int = 100,
resolution: int = 1.0,
random_state: int = None,
theta: int = 1.0,
) -> Tuple[dask_cudf.DataFrame, float]:
"""
Compute the modularity optimizing partition of the input graph using the
Leiden method

Traag, V. A., Waltman, L., & van Eck, N. J. (2019). From Louvain to Leiden:
guaranteeing well-connected communities. Scientific reports, 9(1), 5233.
doi: 10.1038/s41598-019-41695-z

Parameters
----------
G : cugraph.Graph
The graph descriptor should contain the connectivity information
and weights. The adjacency list will be computed if not already
present.
The current implementation only supports undirected graphs.

max_iter : integer, optional (default=100)
This controls the maximum number of levels/iterations of the Leiden
algorithm. When specified the algorithm will terminate after no more
than the specified number of iterations. No error occurs when the
algorithm terminates early in this manner.

resolution: float, optional (default=1.0)
Called gamma in the modularity formula, this changes the size
of the communities. Higher resolutions lead to more smaller
communities, lower resolutions lead to fewer larger communities.
Defaults to 1.

random_state: int, optional(default=None)
Random state to use when generating samples. Optional argument,
defaults to a hash of process id, time, and hostname.

theta: float, optional (default=1.0)
Called theta in the Leiden algorithm, this is used to scale
modularity gain in Leiden refinement phase, to compute
the probability of joining a random leiden community.

Returns
-------
parts : dask_cudf.DataFrame
GPU data frame of size V containing two columns the vertex id and the
partition id it is assigned to.

ddf['vertex'] : cudf.Series
Contains the vertex identifiers
ddf['partition'] : cudf.Series
Contains the partition assigned to the vertices

modularity_score : float
a floating point number containing the global modularity score of the
partitioning.

Examples
--------
>>> from cugraph.experimental.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts, modularity_score = cugraph.leiden(G)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also provide the value of parts so doctest (and users) can verify?

Copy link
Contributor Author

@jnke2016 jnke2016 May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parts is a dataframe of length proportional to the number of vertices (in this case 33 for the karate datasets). That's pretty long

"""

if input_graph.is_directed():
raise ValueError("input graph must be undirected")

# Return a client if one has started
client = default_client()

do_expensive_check = False

result = [
client.submit(
_call_plc_leiden,
Comms.get_session_id(),
input_graph._plc_graph[w],
max_iter,
resolution,
random_state,
theta,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
]

wait(result)

part_mod_score = [client.submit(convert_to_cudf, r) for r in result]
wait(part_mod_score)

vertex_dtype = input_graph.edgelist.edgelist_df.dtypes[0]
empty_df = cudf.DataFrame(
{
"vertex": numpy.empty(shape=0, dtype=vertex_dtype),
"partition": numpy.empty(shape=0, dtype="int32"),
}
)

part_mod_score = [delayed(lambda x: x, nout=2)(r) for r in part_mod_score]

ddf = dask_cudf.from_delayed(
[r[0] for r in part_mod_score], meta=empty_df, verify_meta=False
).persist()

mod_score = dask.array.from_delayed(
part_mod_score[0][1], shape=(1,), dtype=float
).compute()

wait(ddf)
wait(mod_score)

wait([r.release() for r in part_mod_score])

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "vertex")

return ddf, mod_score
67 changes: 40 additions & 27 deletions python/cugraph/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,40 @@
# limitations under the License.
#

from __future__ import annotations

from dask.distributed import wait, default_client
import cugraph.dask.comms.comms as Comms
import dask_cudf
import dask
from dask import delayed
import cudf
import operator as op
import cupy as cp
import numpy

from pylibcugraph import ResourceHandle
from pylibcugraph import louvain as pylibcugraph_louvain
from typing import Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from cugraph import Graph


def convert_to_cudf(cupy_vertex, cupy_partition):
def convert_to_cudf(result: cp.ndarray) -> Tuple[cudf.DataFrame, float]:
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
cupy_vertex, cupy_partition, modularity = result
df = cudf.DataFrame()
df["vertex"] = cupy_vertex
df["partition"] = cupy_partition

return df
return df, modularity


def _call_plc_louvain(sID, mg_graph_x, max_iter, resolution, do_expensive_check):
def _call_plc_louvain(
sID: bytes, mg_graph_x, max_iter: int, resolution: int, do_expensive_check: bool
) -> Tuple[cp.ndarray, cp.ndarray, float]:
return pylibcugraph_louvain(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
graph=mg_graph_x,
Expand All @@ -44,7 +56,9 @@ def _call_plc_louvain(sID, mg_graph_x, max_iter, resolution, do_expensive_check)
)


def louvain(input_graph, max_iter=100, resolution=1.0):
def louvain(
input_graph: Graph, max_iter: int = 100, resolution: int = 1.0
) -> Tuple[dask_cudf.DataFrame, float]:
"""
Compute the modularity optimizing partition of the input graph using the
Louvain method
Expand Down Expand Up @@ -122,32 +136,31 @@ def louvain(input_graph, max_iter=100, resolution=1.0):

wait(result)

# futures is a list of Futures containing tuples of (DataFrame, mod_score),
# unpack using separate calls to client.submit with a callable to get
# individual items.
# FIXME: look into an alternate way (not returning a tuples, accessing
# tuples differently, etc.) since multiple client.submit() calls may not be
# optimal.
result_vertex = [client.submit(op.getitem, f, 0) for f in result]
result_partition = [client.submit(op.getitem, f, 1) for f in result]
mod_score = [client.submit(op.getitem, f, 2) for f in result]

cudf_result = [
client.submit(convert_to_cudf, cp_vertex_arrays, cp_partition_arrays)
for cp_vertex_arrays, cp_partition_arrays in zip(
result_vertex, result_partition
)
]
part_mod_score = [client.submit(convert_to_cudf, r) for r in result]
wait(part_mod_score)

vertex_dtype = input_graph.edgelist.edgelist_df.dtypes[0]
empty_df = cudf.DataFrame(
{
"vertex": numpy.empty(shape=0, dtype=vertex_dtype),
"partition": numpy.empty(shape=0, dtype="int32"),
}
)

part_mod_score = [delayed(lambda x: x, nout=2)(r) for r in part_mod_score]

ddf = dask_cudf.from_delayed(
[r[0] for r in part_mod_score], meta=empty_df, verify_meta=False
).persist()

wait(cudf_result)
# Each worker should have computed the same mod_score
mod_score = mod_score[0].result()
mod_score = dask.array.from_delayed(
part_mod_score[0][1], shape=(1,), dtype=float
).compute()

ddf = dask_cudf.from_delayed(cudf_result).persist()
wait(ddf)
wait(mod_score)

# Wait until the inactive futures are released
wait([(r.release(), c_r.release()) for r, c_r in zip(result, cudf_result)])
wait([r.release() for r in part_mod_score])

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "vertex")
Expand Down
Loading