Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pre-commit for linting in GitHub Actions Workflow #184

Merged
merged 13 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/cpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ jobs:
- name: Install and upgrade python packages
run: |
python -m pip install --upgrade pip setuptools==59.4.0 wheel tox
- name: Lint with flake8, black, isort, interrogate, codespell
run: |
tox -e lint
- name: Run tests
run: |
ref_type=${{ github.ref_type }}
Expand Down
16 changes: 16 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: lint

on:
pull_request:
push:
branches: [main]
tags:
- v*

jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: pre-commit/action@v3.0.0
20 changes: 10 additions & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@ repos:
hooks:
- id: absolufy-imports
- repo: https://github.com/timothycrosley/isort
rev: 5.10.1
rev: 5.11.2
hooks:
- id: isort
additional_dependencies: [toml]
exclude: examples/*
exclude: ^examples/
# types
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.971'
rev: 'v0.991'
hooks:
- id: mypy
language_version: python3
args: [--no-strict-optional, --ignore-missing-imports, --show-traceback, --install-types, --non-interactive]
exclude: docs/*
args: [--non-interactive, --install-types, --namespace-packages, --explicit-package-bases]
exclude: ^docs/
# code style
- repo: https://github.com/python/black
rev: 22.6.0
rev: 22.12.0
hooks:
- id: black
- repo: https://github.com/pycqa/pylint
rev: v2.14.5
rev: v2.15.8
hooks:
- id: pylint
- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.2
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
# notebooks
Expand All @@ -45,7 +45,7 @@ repos:
exclude: ^(build|docs|merlin/io|tests|setup.py|versioneer.py)
args: [--config=pyproject.toml]
- repo: https://github.com/codespell-project/codespell
rev: v2.2.1
rev: v2.2.2
hooks:
- id: codespell
# security
Expand Down
1 change: 0 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ disable=fixme,
# we'll probably never enable these checks
invalid-name,
import-error,
no-self-use,

# disable code-complexity checks for now
# TODO: should we configure the thresholds for these rather than just disable?
Expand Down
23 changes: 16 additions & 7 deletions merlin/core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
from merlin.core.compat import HAS_GPU
from merlin.core.protocols import DataFrameLike, DictLike, SeriesLike

cp = None
cudf = None
cp = None
rmm = None

if HAS_GPU:
try:
import cudf
import cupy as cp
import cudf # type: ignore[no-redef]
import cupy as cp # type: ignore[no-redef]
import dask_cudf
import rmm
import rmm # type: ignore[no-redef]
from cudf.core.column import as_column, build_column

try:
Expand Down Expand Up @@ -285,7 +285,7 @@ def list_val_dtype(ser: SeriesLike) -> np.dtype:
The dtype of the innermost elements
"""
if is_list_dtype(ser):
if HAS_GPU and isinstance(ser, cudf.Series):
if cudf is not None and isinstance(ser, cudf.Series):
if is_list_dtype(ser):
ser = ser.list.leaves
return ser.dtype
Expand Down Expand Up @@ -386,7 +386,14 @@ def read_dispatch(df: DataFrameLike = None, cpu=None, collection=False, fmt="par
if cpu or isinstance(df, pd.DataFrame) or not HAS_GPU:
_mod = dd if collection else pd
else:
_mod = dask_cudf if collection else cudf.io
if collection:
_mod = dask_cudf
elif cudf is not None:
_mod = cudf.io
else:
raise ValueError(
"Unable to load cudf. Please check your environment GPU and cudf available."
)
_attr = "read_csv" if fmt == "csv" else "read_parquet"
return getattr(_mod, _attr)

Expand All @@ -405,8 +412,10 @@ def parquet_writer_dispatch(df: DataFrameLike, path=None, **kwargs):
_cls = pq.ParquetWriter
if path:
_args.append(pa.Table.from_pandas(df, preserve_index=False).schema)
else:
elif cudf is not None:
_cls = cudf.io.parquet.ParquetWriter
else:
ValueError("Unable to load cudf. Please check your environment GPU and cudf available.")

if not path:
return _cls
Expand Down
123 changes: 102 additions & 21 deletions merlin/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import warnings
import zipfile
from contextvars import ContextVar
from typing import Any, Callable, Optional

import dask
import distributed
from dask.dataframe.optimize import optimize as dd_optimize
from dask.distributed import Client, get_client
from tqdm import tqdm
Expand All @@ -40,6 +42,25 @@


def pynvml_mem_size(kind="total", index=0):
"""Get Memory Info for device.

Parameters
----------
kind : str, optional
Either "free" or "total", by default "total"
index : int, optional
Device Index, by default 0

Returns
-------
int
Either free or total memory on device depending on the kind parameter.

Raises
------
ValueError
When kind is not one of {"free", "total"}
"""
import pynvml

pynvml.nvmlInit()
Expand All @@ -49,13 +70,31 @@ def pynvml_mem_size(kind="total", index=0):
elif kind == "total":
size = int(pynvml.nvmlDeviceGetMemoryInfo(pynvml.nvmlDeviceGetHandleByIndex(index)).total)
else:
raise ValueError("{0} not a supported option for device_mem_size.".format(kind))
raise ValueError(f"{kind} not a supported option for device_mem_size.")
pynvml.nvmlShutdown()
return size


def device_mem_size(kind="total", cpu=False):
"""Get Memory Info for either CPU or GPU.

Parameters
----------
kind : str, optional
Either "total" or "free", by default "total"
cpu : bool, optional
Specifies whether to check memory for CPU or GPU, by default False

Returns
-------
int
Free or total memory on device

Raises
------
ValueError
When kind is provided with an unsupported value.
"""
# Use psutil (if available) for cpu mode
if cpu and psutil:
if kind == "total":
Expand All @@ -68,7 +107,7 @@ def device_mem_size(kind="total", cpu=False):
return int(1e9)

if kind not in ["free", "total"]:
raise ValueError("{0} not a supported option for device_mem_size.".format(kind))
raise ValueError(f"{kind} not a supported option for device_mem_size.")
try:
if kind == "free":
return int(cuda.current_context().get_memory_info()[0])
Expand All @@ -79,7 +118,7 @@ def device_mem_size(kind="total", cpu=False):
# Not using NVML "free" memory, because it will not include RMM-managed memory
warnings.warn("get_memory_info is not supported. Using total device memory from NVML.")
size = pynvml_mem_size(kind="total", index=0)
return size
return size


def get_rmm_size(size):
Expand Down Expand Up @@ -129,18 +168,39 @@ def report(chunk, chunksize, total):


def ensure_optimize_dataframe_graph(ddf=None, dsk=None, keys=None):
# Perform HLG DataFrame optimizations
#
# If `ddf` is specified, an optimized Dataframe
# collection will be returned. If `dsk` and `keys`
# are specified, an optimized graph will be returned.
#
# These optimizations are performed automatically
# when a DataFrame collection is computed/persisted,
# but they are NOT always performed when statistics
# are computed. The purpose of this utility is to
# ensure that the Dataframe-based optimizations are
# always applied.
"""Perform HLG DataFrame optimizations

If `ddf` is specified, an optimized Dataframe
collection will be returned. If `dsk` and `keys`
are specified, an optimized graph will be returned.

These optimizations are performed automatically
when a DataFrame collection is computed/persisted,
but they are NOT always performed when statistics
are computed. The purpose of this utility is to
ensure that the Dataframe-based optimizations are
always applied.

Parameters
----------
ddf : dask_cudf.DataFrame, optional
The dataframe to optimize, by default None
dsk : dask.highlevelgraph.HighLevelGraph, optional
Dask high level graph, by default None
keys : List[str], optional
The keys to optimize, by default None

Returns
-------
Union[dask_cudf.DataFrame, dask.highlevelgraph.HighLevelGraph]
A dask_cudf DataFrame or dask HighLevelGraph depending
on the parameters provided.

Raises
------
ValueError
If ddf is not provided and one of dsk or keys are None.
"""

if ddf is None:
if dsk is None or keys is None:
Expand Down Expand Up @@ -394,7 +454,8 @@ def set_client_deprecated(client, caller_str):


def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_options):
"""Set the Dask-Distributed client
"""Set the Dask-Distributed client.

Parameters
-----------
client : {"auto", None} or `dask.distributed.Client`
Expand Down Expand Up @@ -454,11 +515,18 @@ def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_
return None if active == "auto" else active


def global_dask_client():
def global_dask_client() -> Optional[distributed.Client]:
"""Get Global Dask client if it's been set.

Returns
-------
Optional[distributed.Client]
The global client.
"""
# First, check _merlin_dask_client
merlin_client = _merlin_dask_client.get()
if merlin_client and merlin_client != "auto":
if merlin_client.cluster and merlin_client.cluster.workers:
if merlin_client.cluster and merlin_client.cluster.workers: # type: ignore
# Active Dask client already known
return merlin_client
else:
Expand All @@ -471,14 +539,27 @@ def global_dask_client():
set_dask_client(get_client())
return _merlin_dask_client.get()
except ValueError:
# no global client found
pass
# Catch-all
return None


def run_on_worker(func, *args, **kwargs):
# Run a function on a Dask worker using `delayed`
# execution (if a Dask client is detected)
def run_on_worker(func: Callable, *args, **kwargs) -> Any:
"""Run a function on a Dask worker using `delayed`
execution (if a Dask client is detected)

Parameters
----------
func : Callable
The function to run

Returns
-------
Any
The result of the function call with supplied arguments
"""

if global_dask_client():
# There is a specified or global Dask client. Use it
return dask.delayed(func)(*args, **kwargs).compute()
Expand Down
5 changes: 2 additions & 3 deletions merlin/dag/base_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ def validate_schemas(
strict_dtypes : Boolean, optional
Enables strict checking for column dtype matching if True, by default False
"""
...

def transform(
self, col_selector: ColumnSelector, transformable: Transformable
Expand Down Expand Up @@ -313,8 +312,8 @@ def output_column_names(self, col_selector: ColumnSelector) -> ColumnSelector:

@property
def dependencies(self) -> List[Union[str, Any]]:
"""Defines an optional list of column dependencies for this operator. This lets you consume columns
that aren't part of the main transformation workflow.
"""Defines an optional list of column dependencies for this operator.
This lets you consume columns that aren't part of the main transformation workflow.

Returns
-------
Expand Down
Loading