Skip to content

Commit

Permalink
Merge branch 'main' into fix/io-deps
Browse files Browse the repository at this point in the history
  • Loading branch information
karlhigley authored Feb 22, 2023
2 parents a3cff92 + 7f43dba commit 222d0d7
Show file tree
Hide file tree
Showing 35 changed files with 1,109 additions and 264 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/docs-sched-rebuild.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name: docs-sched-rebuild

on:
schedule:
# * is a special character in YAML so you have to quote this string
- cron: "0 0 * * *"
push:
branches: [main]
tags:
- v*
workflow_dispatch:

jobs:
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ repos:
hooks:
- id: absolufy-imports
- repo: https://github.com/timothycrosley/isort
rev: 5.11.2
rev: 5.12.0
hooks:
- id: isort
additional_dependencies: [toml]
Expand All @@ -26,11 +26,11 @@ repos:
exclude: ^docs/
# code style
- repo: https://github.com/python/black
rev: 22.12.0
rev: 23.1.0
hooks:
- id: black
- repo: https://github.com/pycqa/pylint
rev: v2.15.8
rev: v2.16.0
hooks:
- id: pylint
- repo: https://github.com/pycqa/flake8
Expand Down
2 changes: 1 addition & 1 deletion ci/pr.gpu.Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pipeline {
agent {
docker {
image 'nvcr.io/nvstaging/merlin/merlin-ci-runner-wrapper'
label 'merlin_gpu'
label 'merlin_gpu_gcp || merlin_gpu'
registryCredentialsId 'jawe-nvcr-io'
registryUrl 'https://nvcr.io'
args "--runtime=nvidia -e NVIDIA_VISIBLE_DEVICES=all"
Expand Down
40 changes: 29 additions & 11 deletions merlin/core/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
try:
from numba import cuda
import os

try:
from numba import cuda # pylint: disable=unused-import
except ImportError:
cuda = None

HAS_GPU = False
try:
from dask.distributed.diagnostics import nvml
from dask.distributed.diagnostics import nvml

HAS_GPU = nvml.device_get_count() > 0
except ImportError:
# We can use `cuda` to set `HAS_GPU` now that we
# know `distributed` is not installed (otherwise
# the `nvml` import would have succeeded)
HAS_GPU = cuda is not None

def _get_gpu_count():
"""Get Number of GPU devices accounting for CUDA_VISIBLE_DEVICES environment variable"""
# Using the `dask.distributed.diagnostics.nvml.device_get_count`
# helper function from dask to check device counts with NVML
# since this handles some complexity of checking NVML state for us.

# Note: We can't use `numba.cuda.gpus`, since this has some side effects
# that are incompatible with Dask-CUDA. If CUDA runtime functions are
# called before Dask-CUDA can spawn worker processes
# then Dask-CUDA it will not work correctly (raises an exception)
nvml_device_count = nvml.device_get_count()
if nvml_device_count == 0:
return 0
try:
cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"]
if cuda_visible_devices:
return len(cuda_visible_devices.split(","))
else:
return 0
except KeyError:
return nvml_device_count


HAS_GPU = _get_gpu_count() > 0
39 changes: 24 additions & 15 deletions merlin/core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
if HAS_GPU:
try:
import cudf # type: ignore[no-redef]
import cupy as cp # type: ignore[no-redef]
import dask_cudf
import rmm # type: ignore[no-redef]
from cudf.core.column import as_column, build_column
Expand All @@ -47,10 +46,12 @@
# cudf < 21.08
from cudf.utils.dtypes import is_list_dtype as cudf_is_list_dtype
from cudf.utils.dtypes import is_string_dtype as cudf_is_string_dtype

except ImportError:
HAS_GPU = False

pass
try:
import cupy as cp # type: ignore[no-redef]
except ImportError:
pass

try:
# Dask >= 2021.5.1
Expand All @@ -76,7 +77,7 @@ def inner2(*args, **kwargs):
return inner1


if HAS_GPU:
if HAS_GPU and cudf:
DataFrameType = Union[pd.DataFrame, cudf.DataFrame] # type: ignore
SeriesType = Union[pd.Series, cudf.Series] # type: ignore
else:
Expand Down Expand Up @@ -415,7 +416,10 @@ def parquet_writer_dispatch(df: DataFrameLike, path=None, **kwargs):
elif cudf is not None:
_cls = cudf.io.parquet.ParquetWriter
else:
ValueError("Unable to load cudf. Please check your environment GPU and cudf available.")
raise ValueError(
"Unable to load cudf. "
"Please check that your environment has GPU(s) and cudf available."
)

if not path:
return _cls
Expand Down Expand Up @@ -489,16 +493,21 @@ def concat(objs, **kwargs):

def make_df(_like_df=None, device=None):
"""Return a DataFrame with the same dtype as `_like_df`"""
if not cudf or isinstance(_like_df, (pd.DataFrame, pd.Series)):
return pd.DataFrame(_like_df)
elif isinstance(_like_df, (cudf.DataFrame, cudf.Series)):
if not cudf or device == "cpu" or isinstance(_like_df, (pd.DataFrame, pd.Series)):
# move to pandas need it on CPU (host memory)
# can be a cudf, cupy or numpy Series
if cudf and isinstance(_like_df, (cudf.DataFrame, cudf.Series)):
# move to cpu
return _like_df.to_pandas()
if cp and isinstance(_like_df, cp.ndarray):
return pd.DataFrame(_like_df.get())
else:
return pd.DataFrame(_like_df)
else:
if isinstance(_like_df, dict) and len(_like_df) > 0:
if all(isinstance(v, pd.Series) for v in _like_df.values()):
return pd.DataFrame(_like_df)
return cudf.DataFrame(_like_df)
elif device is None and isinstance(_like_df, dict) and len(_like_df) > 0:
is_pandas = all(isinstance(v, pd.Series) for v in _like_df.values())
return pd.DataFrame(_like_df) if is_pandas else cudf.DataFrame(_like_df)
if device == "cpu":
return pd.DataFrame(_like_df)
return cudf.DataFrame(_like_df)


def make_series(_like_ser=None, device=None):
Expand Down
1 change: 1 addition & 0 deletions merlin/dag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
from merlin.dag.graph import Graph
from merlin.dag.node import Node, iter_nodes, postorder_iter_nodes, preorder_iter_nodes
from merlin.dag.selector import ColumnSelector
from merlin.dag.utils import group_values_offsets, ungroup_values_offsets
30 changes: 18 additions & 12 deletions merlin/dag/base_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,8 @@ def compute_output_schema(

output_schema = Schema()
for output_col_name, input_col_names in self.column_mapping(col_selector).items():
col_schema = ColumnSchema(output_col_name)
col_schema = self._compute_dtype(col_schema, input_schema[input_col_names])
col_schema = self._compute_tags(col_schema, input_schema[input_col_names])
col_schema = self._compute_properties(col_schema, input_schema[input_col_names])
input_schema_fragment = input_schema[input_col_names]
col_schema = self.compute_column_schema(output_col_name, input_schema_fragment)
output_schema += Schema([col_schema])

if self.dynamic_dtypes and prev_output_schema:
Expand Down Expand Up @@ -226,7 +224,12 @@ def column_mapping(self, col_selector):
return column_mapping

def compute_column_schema(self, col_name, input_schema):
methods = [self._compute_dtype, self._compute_tags, self._compute_properties]
methods = [
self._compute_dtype,
self._compute_tags,
self._compute_properties,
self._compute_shape,
]
return self._compute_column_schema(col_name, input_schema, methods=methods)

def _compute_column_schema(self, col_name, input_schema, methods=None):
Expand All @@ -239,21 +242,24 @@ def _compute_column_schema(self, col_name, input_schema, methods=None):

def _compute_dtype(self, col_schema, input_schema):
dtype = col_schema.dtype
is_list = col_schema.is_list
is_ragged = col_schema.is_ragged

if input_schema.column_schemas:
source_col_name = input_schema.column_names[0]
dtype = input_schema[source_col_name].dtype
is_list = input_schema[source_col_name].is_list
is_ragged = input_schema[source_col_name].is_ragged

if self.output_dtype is not None:
dtype = self.output_dtype
is_list = any(cs.is_list for _, cs in input_schema.column_schemas.items())
is_ragged = any(cs.is_ragged for _, cs in input_schema.column_schemas.items())

return col_schema.with_dtype(dtype, is_list=is_list, is_ragged=is_ragged)
return col_schema.with_dtype(dtype)

def _compute_shape(self, col_schema, input_schema):
shape = col_schema.shape

if input_schema.column_schemas:
source_col_name = input_schema.column_names[0]
shape = input_schema[source_col_name].shape

return col_schema.with_shape(shape)

@property
def dynamic_dtypes(self):
Expand Down
Loading

0 comments on commit 222d0d7

Please sign in to comment.