From 7fc55963d13c4effb336836eb26c0f290e275d28 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 18 Aug 2021 15:26:50 -0400 Subject: [PATCH 1/9] add gpu param --- dask_sql/context.py | 2 ++ dask_sql/input_utils/convert.py | 7 ++++--- dask_sql/input_utils/location.py | 8 ++++++-- dask_sql/physical/rel/custom/create_table.py | 6 ++++++ 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/dask_sql/context.py b/dask_sql/context.py index c6030814c..13432dc51 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -125,6 +125,7 @@ def create_table( format: str = None, persist: bool = True, schema_name: str = None, + gpu: bool = False, **kwargs, ): """ @@ -199,6 +200,7 @@ def create_table( table_name=table_name, format=format, persist=persist, + gpu=gpu, **kwargs, ) self.schema[schema_name].tables[table_name.lower()] = dc diff --git a/dask_sql/input_utils/convert.py b/dask_sql/input_utils/convert.py index 2bf258835..d1a5f5848 100644 --- a/dask_sql/input_utils/convert.py +++ b/dask_sql/input_utils/convert.py @@ -37,6 +37,7 @@ def to_dc( table_name: str, format: str = None, persist: bool = True, + gpu: bool = False, **kwargs, ) -> DataContainer: """ @@ -45,7 +46,7 @@ def to_dc( maybe persist them to cluster memory before. """ filled_get_dask_dataframe = lambda *args: cls._get_dask_dataframe( - *args, table_name=table_name, format=format, **kwargs, + *args, table_name=table_name, format=format, gpu=gpu, **kwargs, ) if isinstance(input_item, list): @@ -60,7 +61,7 @@ def to_dc( @classmethod def _get_dask_dataframe( - cls, input_item: InputType, table_name: str, format: str = None, **kwargs, + cls, input_item: InputType, table_name: str, format: str = None, gpu: bool = False, **kwargs, ): plugin_list = cls.get_plugins() @@ -69,7 +70,7 @@ def _get_dask_dataframe( input_item, table_name=table_name, format=format, **kwargs ): return plugin.to_dc( - input_item, table_name=table_name, format=format, **kwargs + input_item, table_name=table_name, format=format, gpu=gpu, **kwargs ) raise ValueError(f"Do not understand the input type {type(input_item)}") diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index 7d1ff1067..7c828d6ef 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -15,7 +15,7 @@ def is_correct_input( ): return isinstance(input_item, str) - def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs): + def to_dc(self, input_item: Any, table_name: str, format: str = None, gpu: bool = False, **kwargs): if format == "memory": client = default_client() @@ -27,7 +27,11 @@ def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs): format = extension.lstrip(".") try: - read_function = getattr(dd, f"read_{format}") + if gpu: + import dask_cudf + read_function = getattr(dask_cudf, f"read_{format}") + else: + read_function = getattr(dd, f"read_{format}") except AttributeError: raise AttributeError(f"Can not read files of format {format}") diff --git a/dask_sql/physical/rel/custom/create_table.py b/dask_sql/physical/rel/custom/create_table.py index 6151b3e17..ffb2ed83c 100644 --- a/dask_sql/physical/rel/custom/create_table.py +++ b/dask_sql/physical/rel/custom/create_table.py @@ -62,11 +62,17 @@ def convert( except KeyError: raise AttributeError("Parameters must include a 'location' parameter.") + try: + gpu = kwargs.pop("gpu") + except KeyError: + gpu = False + context.create_table( table_name, location, format=format, persist=persist, schema_name=schema_name, + gpu=gpu, **kwargs, ) From c0ec1be26dda683b5695b85559a10fe2d64f3818 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 18 Aug 2021 15:54:32 -0400 Subject: [PATCH 2/9] style updates --- dask_sql/input_utils/convert.py | 7 ++++++- dask_sql/input_utils/location.py | 9 ++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dask_sql/input_utils/convert.py b/dask_sql/input_utils/convert.py index d1a5f5848..5b2e8a797 100644 --- a/dask_sql/input_utils/convert.py +++ b/dask_sql/input_utils/convert.py @@ -61,7 +61,12 @@ def to_dc( @classmethod def _get_dask_dataframe( - cls, input_item: InputType, table_name: str, format: str = None, gpu: bool = False, **kwargs, + cls, + input_item: InputType, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs, ): plugin_list = cls.get_plugins() diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index 7c828d6ef..0ea45ff3c 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -15,7 +15,14 @@ def is_correct_input( ): return isinstance(input_item, str) - def to_dc(self, input_item: Any, table_name: str, format: str = None, gpu: bool = False, **kwargs): + def to_dc( + self, + input_item: Any, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs + ): if format == "memory": client = default_client() From bcd04cc618912145ce9924c3ef80c8381ccd69d8 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 18 Aug 2021 15:57:37 -0400 Subject: [PATCH 3/9] comma --- dask_sql/input_utils/location.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index 0ea45ff3c..c9b1ad38e 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -21,7 +21,7 @@ def to_dc( table_name: str, format: str = None, gpu: bool = False, - **kwargs + **kwargs, ): if format == "memory": From ee4ddfadb64c66d9f6d0c3629516564632b7e1c2 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 18 Aug 2021 16:00:34 -0400 Subject: [PATCH 4/9] sorry for the spam --- dask_sql/input_utils/location.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index c9b1ad38e..03e606176 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -36,6 +36,7 @@ def to_dc( try: if gpu: import dask_cudf + read_function = getattr(dask_cudf, f"read_{format}") else: read_function = getattr(dd, f"read_{format}") From cc1470395d649b78001d5b0d0e426398262b7c0c Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Thu, 19 Aug 2021 10:51:14 -0400 Subject: [PATCH 5/9] Update dask_sql/physical/rel/custom/create_table.py Co-authored-by: Nils Braun --- dask_sql/physical/rel/custom/create_table.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dask_sql/physical/rel/custom/create_table.py b/dask_sql/physical/rel/custom/create_table.py index ffb2ed83c..d459b4849 100644 --- a/dask_sql/physical/rel/custom/create_table.py +++ b/dask_sql/physical/rel/custom/create_table.py @@ -62,11 +62,7 @@ def convert( except KeyError: raise AttributeError("Parameters must include a 'location' parameter.") - try: - gpu = kwargs.pop("gpu") - except KeyError: - gpu = False - + gpu = kwargs.pop("gpu", False) context.create_table( table_name, location, From 82c4fa39ee7af7c62a5705d5e8ff3094a7e79fc3 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Thu, 19 Aug 2021 11:33:35 -0400 Subject: [PATCH 6/9] Update pandas.py --- dask_sql/input_utils/pandas.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/dask_sql/input_utils/pandas.py b/dask_sql/input_utils/pandas.py index bcef06c51..fc3d1bdc1 100644 --- a/dask_sql/input_utils/pandas.py +++ b/dask_sql/input_utils/pandas.py @@ -12,6 +12,18 @@ def is_correct_input( ): return isinstance(input_item, pd.DataFrame) or format == "dask" - def to_dc(self, input_item, table_name: str, format: str = None, **kwargs): + def to_dc( + self, + input_item, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs, + ): npartitions = kwargs.pop("npartitions", 1) - return dd.from_pandas(input_item, npartitions=npartitions, **kwargs) + if gpu: + import dask_cudf + + return dask_cudf.from_cudf(input_item, npartitions=npartitions, **kwargs) + else: + return dd.from_pandas(input_item, npartitions=npartitions, **kwargs) From b848a24b21c67c59f62de1c144c072c3a8e93542 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Thu, 19 Aug 2021 11:57:21 -0400 Subject: [PATCH 7/9] Update intake.py --- dask_sql/input_utils/intake.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dask_sql/input_utils/intake.py b/dask_sql/input_utils/intake.py index 241f1de33..2204ae855 100644 --- a/dask_sql/input_utils/intake.py +++ b/dask_sql/input_utils/intake.py @@ -18,11 +18,21 @@ def is_correct_input( isinstance(input_item, intake.catalog.Catalog) or format == "intake" ) - def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs): + def to_dc( + self, + input_item: Any, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs + ): table_name = kwargs.pop("intake_table_name", table_name) catalog_kwargs = kwargs.pop("catalog_kwargs", {}) if isinstance(input_item, str): input_item = intake.open_catalog(input_item, **catalog_kwargs) - return input_item[table_name].to_dask(**kwargs) + if gpu: + raise Exception("Intake does not support gpu") + else: + return input_item[table_name].to_dask(**kwargs) From 9d7f39c60e4cf0880878083643ac3f1b803770b1 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Thu, 19 Aug 2021 14:21:36 -0400 Subject: [PATCH 8/9] convert pd to cudf to dask_cudf --- dask_sql/input_utils/pandas.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dask_sql/input_utils/pandas.py b/dask_sql/input_utils/pandas.py index fc3d1bdc1..d1f8cbae3 100644 --- a/dask_sql/input_utils/pandas.py +++ b/dask_sql/input_utils/pandas.py @@ -22,8 +22,12 @@ def to_dc( ): npartitions = kwargs.pop("npartitions", 1) if gpu: - import dask_cudf + import cudf, dask_cudf - return dask_cudf.from_cudf(input_item, npartitions=npartitions, **kwargs) + return dask_cudf.from_cudf( + cudf.from_pandas(input_item), + npartitions=npartitions, + **kwargs, + ) else: return dd.from_pandas(input_item, npartitions=npartitions, **kwargs) From fd41b05e47948f57f56f5e2079068c16e395527d Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Thu, 19 Aug 2021 14:24:07 -0400 Subject: [PATCH 9/9] style update --- dask_sql/input_utils/pandas.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dask_sql/input_utils/pandas.py b/dask_sql/input_utils/pandas.py index d1f8cbae3..14767357b 100644 --- a/dask_sql/input_utils/pandas.py +++ b/dask_sql/input_utils/pandas.py @@ -22,12 +22,11 @@ def to_dc( ): npartitions = kwargs.pop("npartitions", 1) if gpu: - import cudf, dask_cudf + import cudf + import dask_cudf return dask_cudf.from_cudf( - cudf.from_pandas(input_item), - npartitions=npartitions, - **kwargs, + cudf.from_pandas(input_item), npartitions=npartitions, **kwargs, ) else: return dd.from_pandas(input_item, npartitions=npartitions, **kwargs)