From 1ce8262d5081d31adbc76f3587f835ade318dfe2 Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Tue, 29 Aug 2023 11:54:47 -0700 Subject: [PATCH 1/9] the keys that are held by the client are no longer strings but tuples --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index fda7e257367..f0a48cb99d3 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -89,7 +89,7 @@ def get_persisted_df_worker_map(dask_df, client): ddf_keys = futures_of(dask_df) output_map = {} for w, w_keys in client.has_what().items(): - output_map[w] = [ddf_k for ddf_k in ddf_keys if str(ddf_k.key) in w_keys] + output_map[w] = [ddf_k for ddf_k in ddf_keys if ddf_k.key in w_keys] if len(output_map[w]) == 0: output_map[w] = _create_empty_dask_df_future(dask_df._meta, client, w) return output_map From 7f6283a77a46fcc62bec5dc751de2a2875382f9f Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Tue, 29 Aug 2023 16:14:16 -0700 Subject: [PATCH 2/9] remove assumption on keys type --- python/cugraph/cugraph/dask/common/part_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index f0a48cb99d3..ad11b644fb6 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -73,7 +73,7 @@ def persist_distributed_data(dask_df, client): _keys = dask_df.__dask_keys__() worker_dict = {} for i, key in enumerate(_keys): - worker_dict[str(key)] = tuple([worker_addresses[i]]) + worker_dict[key] = tuple([worker_addresses[i]]) persisted = client.persist(dask_df, workers=worker_dict) parts = futures_of(persisted) return parts @@ -167,7 +167,7 @@ async def _extract_partitions( parts = client.compute([p for p in zip(*raveled)]) await wait(parts) - key_to_part = [(str(part.key), part) for part in parts] + key_to_part = [(part.key, part) for part in parts] who_has = await client.who_has(parts) return [(first(who_has[key]), part) for key, part in key_to_part] @@ -229,7 +229,7 @@ def load_balance_func(ddf_, by, client=None): wait(parts) who_has = client.who_has(parts) - key_to_part = [(str(part.key), part) for part in parts] + key_to_part = [(part.key, part) for part in parts] gpu_fututres = [ (first(who_has[key]), part.key[1], part) for key, part in key_to_part ] @@ -283,7 +283,7 @@ def get_delayed_dict(ddf): """ df_delayed = {} for delayed_obj in ddf.to_delayed(): - df_delayed[str(delayed_obj.key)] = delayed_obj + df_delayed[delayed_obj.key] = delayed_obj return df_delayed From 88c5bc7a69d7390294f57620206876c2dee52876 Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Wed, 30 Aug 2023 14:04:44 -0700 Subject: [PATCH 3/9] dummy commit --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index ad11b644fb6..ff382603b53 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -289,7 +289,7 @@ def get_delayed_dict(ddf): def concat_within_workers(client, ddf): """ - Concats all partitions within workers without transfers + Concats all partitions within workers without transfers. """ df_delayed = get_delayed_dict(ddf) From a8d7559bde7abe5a4cec1dbf90885941df1860fc Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Wed, 30 Aug 2023 14:07:26 -0700 Subject: [PATCH 4/9] dummy commit --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index ff382603b53..ee2c430e9d3 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -279,7 +279,7 @@ def concat_dfs(df_list): def get_delayed_dict(ddf): """ Returns a dicitionary with the dataframe tasks as keys and - the dataframe delayed objects as values + the dataframe delayed objects as values. """ df_delayed = {} for delayed_obj in ddf.to_delayed(): From 39e8ba15aee6630e84e15447b870f3fd2fbe8e4a Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Wed, 30 Aug 2023 14:46:22 -0700 Subject: [PATCH 5/9] dummy commit --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index ee2c430e9d3..b155e59dfa7 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -271,7 +271,7 @@ def load_balance_func(ddf_, by, client=None): def concat_dfs(df_list): """ - Concat a list of cudf dataframes + Concat a list of cudf dataframes. """ return cudf.concat(df_list) From f2237f975beaf5e170aad1b47500d9b07aa5867a Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Wed, 30 Aug 2023 14:54:23 -0700 Subject: [PATCH 6/9] dummy commit --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index b155e59dfa7..5bba27da413 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -157,7 +157,7 @@ async def _extract_partitions( # NOTE: We colocate (X, y) here by zipping delayed # n partitions of them as (X1, y1), (X2, y2)... # and asking client to compute a single future for - # each tuple in the list + # each tuple in the list. dela = [np.asarray(d.to_delayed()) for d in dask_obj] # TODO: ravel() is causing strange behavior w/ delayed Arrays which are From 52388aacb692586c1be570d245973f1fcfb931d0 Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Wed, 30 Aug 2023 14:59:02 -0700 Subject: [PATCH 7/9] dummy commit --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 5bba27da413..7c0aad6c3ee 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -245,7 +245,7 @@ def load_balance_func(ddf_, by, client=None): for cumsum in cumsum_parts: num_rows.append(cumsum.iloc[-1]) - # Calculate current partition divisions + # Calculate current partition divisions. divisions = [sum(num_rows[0:x:1]) for x in range(0, len(num_rows) + 1)] divisions[-1] = divisions[-1] - 1 divisions = tuple(divisions) From 13b3c36979adee60ac2e91cccc9233b7bafc433a Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Wed, 30 Aug 2023 16:01:35 -0700 Subject: [PATCH 8/9] dummy commit --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 7c0aad6c3ee..a9d76ae21ec 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -220,7 +220,7 @@ def repartition(ddf, cumsum): def load_balance_func(ddf_, by, client=None): # Load balances the sorted dask_cudf DataFrame. - # Input is a dask_cudf dataframe ddf_ which is sorted by + # Input is a dask_cudf dataframe ddf_ which is sorted by # the column name passed as the 'by' argument. client = default_client() if client is None else client From dcf5218cb8215da7afa86db50b03b37ba6870daa Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Thu, 7 Sep 2023 12:28:34 -0700 Subject: [PATCH 9/9] style --- python/cugraph/cugraph/dask/common/part_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index a9d76ae21ec..7c0aad6c3ee 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -220,7 +220,7 @@ def repartition(ddf, cumsum): def load_balance_func(ddf_, by, client=None): # Load balances the sorted dask_cudf DataFrame. - # Input is a dask_cudf dataframe ddf_ which is sorted by + # Input is a dask_cudf dataframe ddf_ which is sorted by # the column name passed as the 'by' argument. client = default_client() if client is None else client