From 4378e9f580b601fbbf47f761bb7acbd5dcbd1c34 Mon Sep 17 00:00:00 2001 From: Julio Perez <37191411+jperez999@users.noreply.github.com> Date: Tue, 11 Apr 2023 14:39:55 -0400 Subject: [PATCH] uses compat everywhere to allow container bypass when gpus not preset (#277) --- merlin/core/compat/__init__.py | 34 +++++++++++--------- merlin/core/utils.py | 3 +- merlin/dtypes/mappings/cudf.py | 24 +++++++------- merlin/io/avro.py | 2 +- merlin/io/csv.py | 8 ++--- merlin/io/dataframe_engine.py | 6 +--- merlin/io/dataset.py | 8 +---- merlin/io/fsspec_utils.py | 8 ++--- merlin/io/parquet.py | 8 ++--- merlin/io/shuffle.py | 13 ++++---- merlin/io/worker.py | 7 ++--- merlin/io/writer.py | 6 +--- merlin/table/conversions.py | 57 ++++++++++++++++++++++++++++++++-- tests/conftest.py | 13 +++----- tests/unit/io/test_avro.py | 8 +++-- tests/unit/io/test_io.py | 8 +++-- 16 files changed, 126 insertions(+), 87 deletions(-) diff --git a/merlin/core/compat/__init__.py b/merlin/core/compat/__init__.py index 8695873c4..dc5ec84ed 100644 --- a/merlin/core/compat/__init__.py +++ b/merlin/core/compat/__init__.py @@ -21,14 +21,14 @@ from merlin.core.has_gpu import HAS_GPU # noqa pylint: disable=unused-import +if not cuda.is_available(): + cuda = None + try: import psutil except ImportError: psutil = None -if not cuda.is_available(): - cuda = None - def pynvml_mem_size(kind="total", index=0): """Get Memory Info for device. @@ -121,17 +121,23 @@ def device_mem_size(kind="total", cpu=False): except ImportError: pandas = None -try: - import cupy -except ImportError: - cupy = None +if HAS_GPU: + try: + import cupy + except ImportError: + cupy = None -try: - import cudf -except ImportError: - cudf = None + try: + import cudf + except ImportError: + cudf = None -try: - import dask_cudf -except ImportError: + try: + import dask_cudf + except ImportError: + dask_cudf = None +else: + # Without a GPU available none of these packages should be used + cupy = None + cudf = None dask_cudf = None diff --git a/merlin/core/utils.py b/merlin/core/utils.py index 07ee26aea..ec219f26f 100644 --- a/merlin/core/utils.py +++ b/merlin/core/utils.py @@ -30,8 +30,7 @@ from dask.distributed import Client, get_client from tqdm import tqdm -from merlin.core.compat import device_mem_size # noqa pylint: disable=unused-import -from merlin.core.has_gpu import HAS_GPU +from merlin.core.compat import HAS_GPU, cuda, device_mem_size # noqa pylint: disable=unused-import _merlin_dask_client = ContextVar("_merlin_dask_client", default="auto") diff --git a/merlin/dtypes/mappings/cudf.py b/merlin/dtypes/mappings/cudf.py index 4452727cd..25de9d847 100644 --- a/merlin/dtypes/mappings/cudf.py +++ b/merlin/dtypes/mappings/cudf.py @@ -15,6 +15,7 @@ # import numpy as np +from merlin.core.compat import cudf from merlin.core.dispatch import is_string_dtype from merlin.dtypes.mapping import DTypeMapping, NumpyPreprocessor from merlin.dtypes.registry import _dtype_registry @@ -41,17 +42,16 @@ def cudf_translator(raw_dtype) -> np.dtype: return category_type -try: - # We only want to register this mapping if cudf is available, even though - # the mapping itself doesn't use cudf (yet?) +if cudf: + try: + # We only want to register this mapping if cudf is available, even though + # the mapping itself doesn't use cudf (yet?) - import cudf # pylint:disable=unused-import # noqa: F401 + cudf_dtypes = DTypeMapping( + translator=NumpyPreprocessor("cudf", cudf_translator, attrs=["_categories"]), + ) + _dtype_registry.register("cudf", cudf_dtypes) + except ImportError as exc: + from warnings import warn - cudf_dtypes = DTypeMapping( - translator=NumpyPreprocessor("cudf", cudf_translator, attrs=["_categories"]), - ) - _dtype_registry.register("cudf", cudf_dtypes) -except ImportError as exc: - from warnings import warn - - warn(f"cuDF dtype mappings did not load successfully due to an error: {exc.msg}") + warn(f"cuDF dtype mappings did not load successfully due to an error: {exc.msg}") diff --git a/merlin/io/avro.py b/merlin/io/avro.py index 1e2cfde11..7c66f6a37 100644 --- a/merlin/io/avro.py +++ b/merlin/io/avro.py @@ -15,11 +15,11 @@ # import warnings -import cudf import uavro as ua from dask.base import tokenize from dask.dataframe.core import new_dd_object +from merlin.core.compat import cudf from merlin.io.dataset_engine import DatasetEngine diff --git a/merlin/io/csv.py b/merlin/io/csv.py index 8958930ed..86d1acd17 100644 --- a/merlin/io/csv.py +++ b/merlin/io/csv.py @@ -16,17 +16,13 @@ import functools import dask.dataframe as dd - -try: - import dask_cudf -except ImportError: - dask_cudf = None -import numpy as np from dask.bytes import read_bytes from dask.utils import parse_bytes from fsspec.core import get_fs_token_paths from fsspec.utils import infer_compression +from merlin.core.compat import dask_cudf +from merlin.core.compat import numpy as np from merlin.io.dataset_engine import DatasetEngine diff --git a/merlin/io/dataframe_engine.py b/merlin/io/dataframe_engine.py index f63e1283e..003a947b2 100644 --- a/merlin/io/dataframe_engine.py +++ b/merlin/io/dataframe_engine.py @@ -13,14 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -try: - import cudf - import dask_cudf -except ImportError: - cudf = None from dask.dataframe.core import new_dd_object from dask.highlevelgraph import HighLevelGraph +from merlin.core.compat import cudf, dask_cudf from merlin.io.dataset_engine import DatasetEngine diff --git a/merlin/io/dataset.py b/merlin/io/dataset.py index e53861da4..472d9f6fa 100644 --- a/merlin/io/dataset.py +++ b/merlin/io/dataset.py @@ -30,7 +30,7 @@ from fsspec.core import get_fs_token_paths from fsspec.utils import stringify_path -from merlin.core.compat import HAS_GPU, device_mem_size +from merlin.core.compat import HAS_GPU, cudf, device_mem_size from merlin.core.dispatch import ( convert_data, hex_to_int, @@ -49,12 +49,6 @@ from merlin.schema import ColumnSchema, Schema from merlin.schema.io.tensorflow_metadata import TensorflowMetadata -try: - import cudf -except ImportError: - cudf = None - - MERLIN_METADATA_DIR_NAME = ".merlin" LOG = logging.getLogger("merlin") diff --git a/merlin/io/fsspec_utils.py b/merlin/io/fsspec_utils.py index 490d80ef9..ef1806dc2 100644 --- a/merlin/io/fsspec_utils.py +++ b/merlin/io/fsspec_utils.py @@ -28,8 +28,9 @@ else: fsspec_parquet = None -try: - import cudf +from merlin.core.compat import cudf + +if cudf: from cudf.core.column import as_column, build_categorical_column if fsspec_parquet and (Version(cudf.__version__) < Version("21.12.0")): @@ -37,8 +38,7 @@ # reads from python-file objects. Avoid # using the `fsspec.parquet` module fsspec_parquet = None -except ImportError: - cudf = None + # # Parquet-Specific Utilities diff --git a/merlin/io/parquet.py b/merlin/io/parquet.py index 262f7bb2c..710b6ea57 100644 --- a/merlin/io/parquet.py +++ b/merlin/io/parquet.py @@ -27,13 +27,13 @@ from packaging.version import Version -try: - import cudf +from merlin.core.compat import cudf + +if cudf: import dask_cudf from cudf.io.parquet import ParquetWriter as pwriter_cudf from dask_cudf.io.parquet import CudfEngine -except ImportError: - cudf = None + import dask import dask.dataframe as dd import fsspec diff --git a/merlin/io/shuffle.py b/merlin/io/shuffle.py index ae3c23e8a..fd754bbab 100644 --- a/merlin/io/shuffle.py +++ b/merlin/io/shuffle.py @@ -19,14 +19,15 @@ import pandas as pd from packaging.version import Version -_IGNORE_INDEX_SUPPORTED = Version(pd.__version__) >= Version("1.3.0") +from merlin.core.compat import cudf -try: - import cudf +_IGNORE_INDEX_SUPPORTED = Version(pd.__version__) >= Version("1.3.0") - _CUDF_IGNORE_INDEX_SUPPORTED = Version(cudf.__version__) >= Version("22.04.0") -except ImportError: - _CUDF_IGNORE_INDEX_SUPPORTED = None +if cudf: + try: + _CUDF_IGNORE_INDEX_SUPPORTED = Version(cudf.__version__) >= Version("22.04.0") + except ImportError: + _CUDF_IGNORE_INDEX_SUPPORTED = None class Shuffle(enum.Enum): diff --git a/merlin/io/worker.py b/merlin/io/worker.py index 889c2e3c9..2a8566bec 100644 --- a/merlin/io/worker.py +++ b/merlin/io/worker.py @@ -18,14 +18,11 @@ import threading import pandas as pd - -try: - import cudf -except ImportError: - cudf = None import pyarrow as pa from dask.distributed import get_worker +from merlin.core.compat import cudf + # Use global variable as the default # cache when there are no distributed workers. # Use a thread lock to "handle" multiple Dask diff --git a/merlin/io/writer.py b/merlin/io/writer.py index c99154820..66c24170d 100644 --- a/merlin/io/writer.py +++ b/merlin/io/writer.py @@ -19,14 +19,10 @@ import threading from typing import Optional -try: - import cupy as cp -except ImportError: - cp = None - import numpy as np from fsspec.core import get_fs_token_paths +from merlin.core.compat import cupy as cp from merlin.core.dispatch import annotate from merlin.io.shuffle import shuffle_df diff --git a/merlin/table/conversions.py b/merlin/table/conversions.py index 68cfa53d3..218af23a8 100644 --- a/merlin/table/conversions.py +++ b/merlin/table/conversions.py @@ -42,6 +42,20 @@ def _from_dlpack_gpu(to, capsule): def to_dlpack_col(column: TensorColumn, to_dlpack_fn: Callable) -> _DlpackColumn: + """Creates dlpack representation of the TensorColumn supplied. + + Parameters + ---------- + column : TensorColumn + Original data to be put in dlpack capsule(s) + to_dlpack_fn : Callable + The logic to use to create dlpack representation + + Returns + ------- + _DlpackColumn + A TensorColumn with values and offsets represented as dlpack capsules + """ vals_cap = to_dlpack_fn(column.values) offs_cap = to_dlpack_fn(column.offsets) if column.offsets is not None else None return _DlpackColumn(vals_cap, offs_cap, column) @@ -50,6 +64,24 @@ def to_dlpack_col(column: TensorColumn, to_dlpack_fn: Callable) -> _DlpackColumn def from_dlpack_col( dlpack_col: _DlpackColumn, from_dlpack_fn: Callable, target_col_type: Type, _unsafe: bool ) -> TensorColumn: + """Unwraps a DLpack representation of a TensorColumn and creates a + TensorColumn of the target_col_type. This function is used in conjunction + with to_dlpack_col. + + Parameters + ---------- + dlpack_col : _DlpackColumn + The dlpack representation of the original TensorColum + from_dlpack_fn : Callable + Function containing logic to unwrap dlpack capsule + target_col_type : Type + Desired TensorColumn return type from unwrap of dlpack representation. + + Returns + ------- + TensorColumn + A TensorColumn of type target_col_type. + """ target_array_type = target_col_type.array_type() values = from_dlpack_fn(target_array_type, dlpack_col.values) @@ -86,6 +118,25 @@ def convert_col( _from_dlpack_fn: Callable = None, _unsafe: bool = False, ): + """Convert a TensorColumn to a Different TensorColumn, + uses DLPack (zero copy) to transfer between TensorColumns + + Parameters + ---------- + column : TensorColumn + The Column to be transformed + target_type : Type + The desired TensorColumn, to be produced + _to_dlpack_fn : Callable, optional + cached to_dlpack function, by default None + _from_dlpack_fn : Callable, optional + cached from_dlpack function, by default None + + Returns + ------- + TensorColumn + A TensorColumn of the type identified in target_type parameter. + """ # If there's nothing to do, take a shortcut if isinstance(column, target_type): return column @@ -103,6 +154,9 @@ def convert_col( def df_from_tensor_table(table): + """ + Create a dataframe from a TensorTable + """ device = "cpu" if table.device == Device.CPU else None df_dict = {} for col_name, col_data in table.items(): @@ -132,8 +186,7 @@ def _tensor_table_from_pandas_df(df: pd.DataFrame): @tensor_table_from_df.register_lazy("cudf") def _register_tensor_table_from_cudf_df(): - import cudf - + from merlin.core.compat import cudf from merlin.table import CupyColumn @tensor_table_from_df.register(cudf.DataFrame) diff --git a/tests/conftest.py b/tests/conftest.py index dda47d110..f81518365 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,14 +19,12 @@ import random import dask -import numpy as np -import pandas as pd -from merlin.core.compat import HAS_GPU - -try: - import cudf +from merlin.core.compat import HAS_GPU, cudf +from merlin.core.compat import numpy as np +from merlin.core.compat import pandas as pd +if cudf: try: import cudf.testing._utils @@ -35,8 +33,7 @@ import cudf.tests.utils assert_eq = cudf.tests.utils.assert_eq -except ImportError: - cudf = None +else: def assert_eq(a, b, *args, **kwargs): if isinstance(a, pd.DataFrame): diff --git a/tests/unit/io/test_avro.py b/tests/unit/io/test_avro.py index 740f29156..6037c216f 100644 --- a/tests/unit/io/test_avro.py +++ b/tests/unit/io/test_avro.py @@ -22,10 +22,12 @@ from dask.dataframe.io.demo import names as name_list import merlin.io +from merlin.core.compat import cudf -cudf = pytest.importorskip("cudf") -dask_cudf = pytest.importorskip("dask_cudf") - +if cudf: + dask_cudf = pytest.importorskip("dask_cudf") +else: + pytest.mark.skip(reason="cudf did not import successfully") # Require uavro and fastavro library. # Note that fastavro is only required to write # avro files for testing, while uavro is actually diff --git a/tests/unit/io/test_io.py b/tests/unit/io/test_io.py index b0698b137..3ca32d8ff 100644 --- a/tests/unit/io/test_io.py +++ b/tests/unit/io/test_io.py @@ -30,14 +30,16 @@ import merlin.dtypes as md import merlin.io from merlin.core import dispatch -from merlin.core.compat import HAS_GPU +from merlin.core.compat import HAS_GPU, cudf from merlin.io.parquet import GPUParquetWriter from merlin.schema.io.tensorflow_metadata import TensorflowMetadata from merlin.schema.tags import Tags, TagSet from tests.conftest import allcols_csv, mycols_csv, mycols_pq -cudf = pytest.importorskip("cudf") -dask_cudf = pytest.importorskip("dask_cudf") +if cudf: + dask_cudf = pytest.importorskip("dask_cudf") +else: + pytest.mark.skip(reason="cudf did not import successfully") if not HAS_GPU: pytestmark = pytest.mark.skip(reason="at least one visible CUDA GPU required.")