Skip to content

Commit

Permalink
uses compat everywhere to allow container bypass when gpus not preset (
Browse files Browse the repository at this point in the history
  • Loading branch information
jperez999 authored Apr 11, 2023
1 parent fce0366 commit 4378e9f
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 87 deletions.
34 changes: 20 additions & 14 deletions merlin/core/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@

from merlin.core.has_gpu import HAS_GPU # noqa pylint: disable=unused-import

if not cuda.is_available():
cuda = None

try:
import psutil
except ImportError:
psutil = None

if not cuda.is_available():
cuda = None


def pynvml_mem_size(kind="total", index=0):
"""Get Memory Info for device.
Expand Down Expand Up @@ -121,17 +121,23 @@ def device_mem_size(kind="total", cpu=False):
except ImportError:
pandas = None

try:
import cupy
except ImportError:
cupy = None
if HAS_GPU:
try:
import cupy
except ImportError:
cupy = None

try:
import cudf
except ImportError:
cudf = None
try:
import cudf
except ImportError:
cudf = None

try:
import dask_cudf
except ImportError:
try:
import dask_cudf
except ImportError:
dask_cudf = None
else:
# Without a GPU available none of these packages should be used
cupy = None
cudf = None
dask_cudf = None
3 changes: 1 addition & 2 deletions merlin/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
from dask.distributed import Client, get_client
from tqdm import tqdm

from merlin.core.compat import device_mem_size # noqa pylint: disable=unused-import
from merlin.core.has_gpu import HAS_GPU
from merlin.core.compat import HAS_GPU, cuda, device_mem_size # noqa pylint: disable=unused-import

_merlin_dask_client = ContextVar("_merlin_dask_client", default="auto")

Expand Down
24 changes: 12 additions & 12 deletions merlin/dtypes/mappings/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
import numpy as np

from merlin.core.compat import cudf
from merlin.core.dispatch import is_string_dtype
from merlin.dtypes.mapping import DTypeMapping, NumpyPreprocessor
from merlin.dtypes.registry import _dtype_registry
Expand All @@ -41,17 +42,16 @@ def cudf_translator(raw_dtype) -> np.dtype:
return category_type


try:
# We only want to register this mapping if cudf is available, even though
# the mapping itself doesn't use cudf (yet?)
if cudf:
try:
# We only want to register this mapping if cudf is available, even though
# the mapping itself doesn't use cudf (yet?)

import cudf # pylint:disable=unused-import # noqa: F401
cudf_dtypes = DTypeMapping(
translator=NumpyPreprocessor("cudf", cudf_translator, attrs=["_categories"]),
)
_dtype_registry.register("cudf", cudf_dtypes)
except ImportError as exc:
from warnings import warn

cudf_dtypes = DTypeMapping(
translator=NumpyPreprocessor("cudf", cudf_translator, attrs=["_categories"]),
)
_dtype_registry.register("cudf", cudf_dtypes)
except ImportError as exc:
from warnings import warn

warn(f"cuDF dtype mappings did not load successfully due to an error: {exc.msg}")
warn(f"cuDF dtype mappings did not load successfully due to an error: {exc.msg}")
2 changes: 1 addition & 1 deletion merlin/io/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
#
import warnings

import cudf
import uavro as ua
from dask.base import tokenize
from dask.dataframe.core import new_dd_object

from merlin.core.compat import cudf
from merlin.io.dataset_engine import DatasetEngine


Expand Down
8 changes: 2 additions & 6 deletions merlin/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@
import functools

import dask.dataframe as dd

try:
import dask_cudf
except ImportError:
dask_cudf = None
import numpy as np
from dask.bytes import read_bytes
from dask.utils import parse_bytes
from fsspec.core import get_fs_token_paths
from fsspec.utils import infer_compression

from merlin.core.compat import dask_cudf
from merlin.core.compat import numpy as np
from merlin.io.dataset_engine import DatasetEngine


Expand Down
6 changes: 1 addition & 5 deletions merlin/io/dataframe_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
try:
import cudf
import dask_cudf
except ImportError:
cudf = None
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph

from merlin.core.compat import cudf, dask_cudf
from merlin.io.dataset_engine import DatasetEngine


Expand Down
8 changes: 1 addition & 7 deletions merlin/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from fsspec.core import get_fs_token_paths
from fsspec.utils import stringify_path

from merlin.core.compat import HAS_GPU, device_mem_size
from merlin.core.compat import HAS_GPU, cudf, device_mem_size
from merlin.core.dispatch import (
convert_data,
hex_to_int,
Expand All @@ -49,12 +49,6 @@
from merlin.schema import ColumnSchema, Schema
from merlin.schema.io.tensorflow_metadata import TensorflowMetadata

try:
import cudf
except ImportError:
cudf = None


MERLIN_METADATA_DIR_NAME = ".merlin"
LOG = logging.getLogger("merlin")

Expand Down
8 changes: 4 additions & 4 deletions merlin/io/fsspec_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
else:
fsspec_parquet = None

try:
import cudf
from merlin.core.compat import cudf

if cudf:
from cudf.core.column import as_column, build_categorical_column

if fsspec_parquet and (Version(cudf.__version__) < Version("21.12.0")):
# This version of cudf does not support
# reads from python-file objects. Avoid
# using the `fsspec.parquet` module
fsspec_parquet = None
except ImportError:
cudf = None


#
# Parquet-Specific Utilities
Expand Down
8 changes: 4 additions & 4 deletions merlin/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@

from packaging.version import Version

try:
import cudf
from merlin.core.compat import cudf

if cudf:
import dask_cudf
from cudf.io.parquet import ParquetWriter as pwriter_cudf
from dask_cudf.io.parquet import CudfEngine
except ImportError:
cudf = None

import dask
import dask.dataframe as dd
import fsspec
Expand Down
13 changes: 7 additions & 6 deletions merlin/io/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import pandas as pd
from packaging.version import Version

_IGNORE_INDEX_SUPPORTED = Version(pd.__version__) >= Version("1.3.0")
from merlin.core.compat import cudf

try:
import cudf
_IGNORE_INDEX_SUPPORTED = Version(pd.__version__) >= Version("1.3.0")

_CUDF_IGNORE_INDEX_SUPPORTED = Version(cudf.__version__) >= Version("22.04.0")
except ImportError:
_CUDF_IGNORE_INDEX_SUPPORTED = None
if cudf:
try:
_CUDF_IGNORE_INDEX_SUPPORTED = Version(cudf.__version__) >= Version("22.04.0")
except ImportError:
_CUDF_IGNORE_INDEX_SUPPORTED = None


class Shuffle(enum.Enum):
Expand Down
7 changes: 2 additions & 5 deletions merlin/io/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
import threading

import pandas as pd

try:
import cudf
except ImportError:
cudf = None
import pyarrow as pa
from dask.distributed import get_worker

from merlin.core.compat import cudf

# Use global variable as the default
# cache when there are no distributed workers.
# Use a thread lock to "handle" multiple Dask
Expand Down
6 changes: 1 addition & 5 deletions merlin/io/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@
import threading
from typing import Optional

try:
import cupy as cp
except ImportError:
cp = None

import numpy as np
from fsspec.core import get_fs_token_paths

from merlin.core.compat import cupy as cp
from merlin.core.dispatch import annotate
from merlin.io.shuffle import shuffle_df

Expand Down
57 changes: 55 additions & 2 deletions merlin/table/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ def _from_dlpack_gpu(to, capsule):


def to_dlpack_col(column: TensorColumn, to_dlpack_fn: Callable) -> _DlpackColumn:
"""Creates dlpack representation of the TensorColumn supplied.
Parameters
----------
column : TensorColumn
Original data to be put in dlpack capsule(s)
to_dlpack_fn : Callable
The logic to use to create dlpack representation
Returns
-------
_DlpackColumn
A TensorColumn with values and offsets represented as dlpack capsules
"""
vals_cap = to_dlpack_fn(column.values)
offs_cap = to_dlpack_fn(column.offsets) if column.offsets is not None else None
return _DlpackColumn(vals_cap, offs_cap, column)
Expand All @@ -50,6 +64,24 @@ def to_dlpack_col(column: TensorColumn, to_dlpack_fn: Callable) -> _DlpackColumn
def from_dlpack_col(
dlpack_col: _DlpackColumn, from_dlpack_fn: Callable, target_col_type: Type, _unsafe: bool
) -> TensorColumn:
"""Unwraps a DLpack representation of a TensorColumn and creates a
TensorColumn of the target_col_type. This function is used in conjunction
with to_dlpack_col.
Parameters
----------
dlpack_col : _DlpackColumn
The dlpack representation of the original TensorColum
from_dlpack_fn : Callable
Function containing logic to unwrap dlpack capsule
target_col_type : Type
Desired TensorColumn return type from unwrap of dlpack representation.
Returns
-------
TensorColumn
A TensorColumn of type target_col_type.
"""
target_array_type = target_col_type.array_type()

values = from_dlpack_fn(target_array_type, dlpack_col.values)
Expand Down Expand Up @@ -86,6 +118,25 @@ def convert_col(
_from_dlpack_fn: Callable = None,
_unsafe: bool = False,
):
"""Convert a TensorColumn to a Different TensorColumn,
uses DLPack (zero copy) to transfer between TensorColumns
Parameters
----------
column : TensorColumn
The Column to be transformed
target_type : Type
The desired TensorColumn, to be produced
_to_dlpack_fn : Callable, optional
cached to_dlpack function, by default None
_from_dlpack_fn : Callable, optional
cached from_dlpack function, by default None
Returns
-------
TensorColumn
A TensorColumn of the type identified in target_type parameter.
"""
# If there's nothing to do, take a shortcut
if isinstance(column, target_type):
return column
Expand All @@ -103,6 +154,9 @@ def convert_col(


def df_from_tensor_table(table):
"""
Create a dataframe from a TensorTable
"""
device = "cpu" if table.device == Device.CPU else None
df_dict = {}
for col_name, col_data in table.items():
Expand Down Expand Up @@ -132,8 +186,7 @@ def _tensor_table_from_pandas_df(df: pd.DataFrame):

@tensor_table_from_df.register_lazy("cudf")
def _register_tensor_table_from_cudf_df():
import cudf

from merlin.core.compat import cudf
from merlin.table import CupyColumn

@tensor_table_from_df.register(cudf.DataFrame)
Expand Down
13 changes: 5 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import random

import dask
import numpy as np
import pandas as pd

from merlin.core.compat import HAS_GPU

try:
import cudf
from merlin.core.compat import HAS_GPU, cudf
from merlin.core.compat import numpy as np
from merlin.core.compat import pandas as pd

if cudf:
try:
import cudf.testing._utils

Expand All @@ -35,8 +33,7 @@
import cudf.tests.utils

assert_eq = cudf.tests.utils.assert_eq
except ImportError:
cudf = None
else:

def assert_eq(a, b, *args, **kwargs):
if isinstance(a, pd.DataFrame):
Expand Down
Loading

0 comments on commit 4378e9f

Please sign in to comment.