Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pre-commit for linting in GitHub Actions Workflow #184

Merged
merged 13 commits into from
Dec 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/cpu-ci.yml
Original file line number Diff line number Diff line change
@@ -32,9 +32,6 @@ jobs:
- name: Install and upgrade python packages
run: |
python -m pip install --upgrade pip setuptools==59.4.0 wheel tox
- name: Lint with flake8, black, isort, interrogate, codespell
run: |
tox -e lint
- name: Run tests
run: |
ref_type=${{ github.ref_type }}
16 changes: 16 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: lint

on:
pull_request:
push:
branches: [main]
tags:
- v*

jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: pre-commit/action@v3.0.0
20 changes: 10 additions & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -5,30 +5,30 @@ repos:
hooks:
- id: absolufy-imports
- repo: https://github.com/timothycrosley/isort
rev: 5.10.1
rev: 5.11.2
hooks:
- id: isort
additional_dependencies: [toml]
exclude: examples/*
exclude: ^examples/
# types
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.971'
rev: 'v0.991'
hooks:
- id: mypy
language_version: python3
args: [--no-strict-optional, --ignore-missing-imports, --show-traceback, --install-types, --non-interactive]
exclude: docs/*
args: [--non-interactive, --install-types, --namespace-packages, --explicit-package-bases]
exclude: ^docs/
# code style
- repo: https://github.com/python/black
rev: 22.6.0
rev: 22.12.0
hooks:
- id: black
- repo: https://github.com/pycqa/pylint
rev: v2.14.5
rev: v2.15.8
hooks:
- id: pylint
- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.2
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
# notebooks
@@ -45,7 +45,7 @@ repos:
exclude: ^(build|docs|merlin/io|tests|setup.py|versioneer.py)
args: [--config=pyproject.toml]
- repo: https://github.com/codespell-project/codespell
rev: v2.2.1
rev: v2.2.2
hooks:
- id: codespell
# security
1 change: 0 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@ disable=fixme,
# we'll probably never enable these checks
invalid-name,
import-error,
no-self-use,

# disable code-complexity checks for now
# TODO: should we configure the thresholds for these rather than just disable?
23 changes: 16 additions & 7 deletions merlin/core/dispatch.py
Original file line number Diff line number Diff line change
@@ -27,16 +27,16 @@
from merlin.core.compat import HAS_GPU
from merlin.core.protocols import DataFrameLike, DictLike, SeriesLike

cp = None
cudf = None
cp = None
rmm = None

if HAS_GPU:
try:
import cudf
import cupy as cp
import cudf # type: ignore[no-redef]
import cupy as cp # type: ignore[no-redef]
import dask_cudf
import rmm
import rmm # type: ignore[no-redef]
from cudf.core.column import as_column, build_column

try:
@@ -285,7 +285,7 @@ def list_val_dtype(ser: SeriesLike) -> np.dtype:
The dtype of the innermost elements
"""
if is_list_dtype(ser):
if HAS_GPU and isinstance(ser, cudf.Series):
if cudf is not None and isinstance(ser, cudf.Series):
if is_list_dtype(ser):
ser = ser.list.leaves
return ser.dtype
@@ -386,7 +386,14 @@ def read_dispatch(df: DataFrameLike = None, cpu=None, collection=False, fmt="par
if cpu or isinstance(df, pd.DataFrame) or not HAS_GPU:
_mod = dd if collection else pd
else:
_mod = dask_cudf if collection else cudf.io
if collection:
_mod = dask_cudf
elif cudf is not None:
_mod = cudf.io
else:
raise ValueError(
"Unable to load cudf. Please check your environment GPU and cudf available."
)
_attr = "read_csv" if fmt == "csv" else "read_parquet"
return getattr(_mod, _attr)

@@ -405,8 +412,10 @@ def parquet_writer_dispatch(df: DataFrameLike, path=None, **kwargs):
_cls = pq.ParquetWriter
if path:
_args.append(pa.Table.from_pandas(df, preserve_index=False).schema)
else:
elif cudf is not None:
_cls = cudf.io.parquet.ParquetWriter
else:
ValueError("Unable to load cudf. Please check your environment GPU and cudf available.")

if not path:
return _cls
123 changes: 102 additions & 21 deletions merlin/core/utils.py
Original file line number Diff line number Diff line change
@@ -22,8 +22,10 @@
import warnings
import zipfile
from contextvars import ContextVar
from typing import Any, Callable, Optional

import dask
import distributed
from dask.dataframe.optimize import optimize as dd_optimize
from dask.distributed import Client, get_client
from tqdm import tqdm
@@ -40,6 +42,25 @@


def pynvml_mem_size(kind="total", index=0):
"""Get Memory Info for device.
Parameters
----------
kind : str, optional
Either "free" or "total", by default "total"
index : int, optional
Device Index, by default 0
Returns
-------
int
Either free or total memory on device depending on the kind parameter.
Raises
------
ValueError
When kind is not one of {"free", "total"}
"""
import pynvml

pynvml.nvmlInit()
@@ -49,13 +70,31 @@ def pynvml_mem_size(kind="total", index=0):
elif kind == "total":
size = int(pynvml.nvmlDeviceGetMemoryInfo(pynvml.nvmlDeviceGetHandleByIndex(index)).total)
else:
raise ValueError("{0} not a supported option for device_mem_size.".format(kind))
raise ValueError(f"{kind} not a supported option for device_mem_size.")
pynvml.nvmlShutdown()
return size


def device_mem_size(kind="total", cpu=False):
"""Get Memory Info for either CPU or GPU.
Parameters
----------
kind : str, optional
Either "total" or "free", by default "total"
cpu : bool, optional
Specifies whether to check memory for CPU or GPU, by default False
Returns
-------
int
Free or total memory on device
Raises
------
ValueError
When kind is provided with an unsupported value.
"""
# Use psutil (if available) for cpu mode
if cpu and psutil:
if kind == "total":
@@ -68,7 +107,7 @@ def device_mem_size(kind="total", cpu=False):
return int(1e9)

if kind not in ["free", "total"]:
raise ValueError("{0} not a supported option for device_mem_size.".format(kind))
raise ValueError(f"{kind} not a supported option for device_mem_size.")
try:
if kind == "free":
return int(cuda.current_context().get_memory_info()[0])
@@ -79,7 +118,7 @@ def device_mem_size(kind="total", cpu=False):
# Not using NVML "free" memory, because it will not include RMM-managed memory
warnings.warn("get_memory_info is not supported. Using total device memory from NVML.")
size = pynvml_mem_size(kind="total", index=0)
return size
return size


def get_rmm_size(size):
@@ -129,18 +168,39 @@ def report(chunk, chunksize, total):


def ensure_optimize_dataframe_graph(ddf=None, dsk=None, keys=None):
# Perform HLG DataFrame optimizations
#
# If `ddf` is specified, an optimized Dataframe
# collection will be returned. If `dsk` and `keys`
# are specified, an optimized graph will be returned.
#
# These optimizations are performed automatically
# when a DataFrame collection is computed/persisted,
# but they are NOT always performed when statistics
# are computed. The purpose of this utility is to
# ensure that the Dataframe-based optimizations are
# always applied.
"""Perform HLG DataFrame optimizations
If `ddf` is specified, an optimized Dataframe
collection will be returned. If `dsk` and `keys`
are specified, an optimized graph will be returned.
These optimizations are performed automatically
when a DataFrame collection is computed/persisted,
but they are NOT always performed when statistics
are computed. The purpose of this utility is to
ensure that the Dataframe-based optimizations are
always applied.
Parameters
----------
ddf : dask_cudf.DataFrame, optional
The dataframe to optimize, by default None
dsk : dask.highlevelgraph.HighLevelGraph, optional
Dask high level graph, by default None
keys : List[str], optional
The keys to optimize, by default None
Returns
-------
Union[dask_cudf.DataFrame, dask.highlevelgraph.HighLevelGraph]
A dask_cudf DataFrame or dask HighLevelGraph depending
on the parameters provided.
Raises
------
ValueError
If ddf is not provided and one of dsk or keys are None.
"""

if ddf is None:
if dsk is None or keys is None:
@@ -394,7 +454,8 @@ def set_client_deprecated(client, caller_str):


def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_options):
"""Set the Dask-Distributed client
"""Set the Dask-Distributed client.
Parameters
-----------
client : {"auto", None} or `dask.distributed.Client`
@@ -454,11 +515,18 @@ def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_
return None if active == "auto" else active


def global_dask_client():
def global_dask_client() -> Optional[distributed.Client]:
"""Get Global Dask client if it's been set.
Returns
-------
Optional[distributed.Client]
The global client.
"""
# First, check _merlin_dask_client
merlin_client = _merlin_dask_client.get()
if merlin_client and merlin_client != "auto":
if merlin_client.cluster and merlin_client.cluster.workers:
if merlin_client.cluster and merlin_client.cluster.workers: # type: ignore
# Active Dask client already known
return merlin_client
else:
@@ -471,14 +539,27 @@ def global_dask_client():
set_dask_client(get_client())
return _merlin_dask_client.get()
except ValueError:
# no global client found
pass
# Catch-all
return None


def run_on_worker(func, *args, **kwargs):
# Run a function on a Dask worker using `delayed`
# execution (if a Dask client is detected)
def run_on_worker(func: Callable, *args, **kwargs) -> Any:
"""Run a function on a Dask worker using `delayed`
execution (if a Dask client is detected)
Parameters
----------
func : Callable
The function to run
Returns
-------
Any
The result of the function call with supplied arguments
"""

if global_dask_client():
# There is a specified or global Dask client. Use it
return dask.delayed(func)(*args, **kwargs).compute()
5 changes: 2 additions & 3 deletions merlin/dag/base_operator.py
Original file line number Diff line number Diff line change
@@ -186,7 +186,6 @@ def validate_schemas(
strict_dtypes : Boolean, optional
Enables strict checking for column dtype matching if True, by default False
"""
...

def transform(
self, col_selector: ColumnSelector, transformable: Transformable
@@ -313,8 +312,8 @@ def output_column_names(self, col_selector: ColumnSelector) -> ColumnSelector:

@property
def dependencies(self) -> List[Union[str, Any]]:
"""Defines an optional list of column dependencies for this operator. This lets you consume columns
that aren't part of the main transformation workflow.
"""Defines an optional list of column dependencies for this operator.
This lets you consume columns that aren't part of the main transformation workflow.
Returns
-------
36 changes: 32 additions & 4 deletions merlin/dag/node.py
Original file line number Diff line number Diff line change
@@ -67,7 +67,14 @@ def selector(self, sel):

# These methods must maintain grouping
def add_dependency(
self, dep: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]]
self,
dep: Union[
str,
List[str],
ColumnSelector,
"Node",
List[Union[str, List[str], "Node", ColumnSelector]],
],
):
"""
Adding a dependency node to this node
@@ -90,7 +97,14 @@ def add_dependency(
self.dependencies.append(dep_node)

def add_parent(
self, parent: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]]
self,
parent: Union[
str,
List[str],
ColumnSelector,
"Node",
List[Union[str, List[str], "Node", ColumnSelector]],
],
):
"""
Adding a parent node to this node
@@ -111,7 +125,14 @@ def add_parent(
self.parents.extend(parent_nodes)

def add_child(
self, child: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]]
self,
child: Union[
str,
List[str],
ColumnSelector,
"Node",
List[Union[str, List[str], "Node", ColumnSelector]],
],
):
"""
Adding a child node to this node
@@ -132,7 +153,14 @@ def add_child(
self.children.extend(child_nodes)

def remove_child(
self, child: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]]
self,
child: Union[
str,
List[str],
ColumnSelector,
"Node",
List[Union[str, List[str], "Node", ColumnSelector]],
],
):
"""
Removing a child node from this node
2 changes: 1 addition & 1 deletion merlin/dag/selector.py
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ class ColumnSelector:

def __init__(
self,
names: List[str] = None,
names: Union[str, List[str]] = None,
subgroups: List["ColumnSelector"] = None,
tags: List[Union[Tags, str]] = None,
):
2 changes: 1 addition & 1 deletion merlin/io/avro.py
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ class AvroDatasetEngine(DatasetEngine):
def __init__(self, paths, part_size, storage_options=None, cpu=False, **kwargs):
# pylint: disable=access-member-before-definition
super().__init__(paths, part_size, storage_options=storage_options, cpu=cpu)
if kwargs != {}:
if kwargs:
raise ValueError("Unexpected AvroDatasetEngine argument(s).")
self.blocksize = part_size

7 changes: 1 addition & 6 deletions merlin/io/dask.py
Original file line number Diff line number Diff line change
@@ -110,12 +110,7 @@ def _get_partition_groups(df, partition_cols, fs, output_path, filename):
keys = tuple(sub_df[col].iloc[0] for col in partition_cols)
if not isinstance(keys, tuple):
keys = (keys,)
subdir = fs.sep.join(
[
"{colname}={value}".format(colname=name, value=val)
for name, val in zip(partition_cols, keys)
]
)
subdir = fs.sep.join([f"{name}={val}" for name, val in zip(partition_cols, keys)])
prefix = fs.sep.join([output_path, subdir])
fs.mkdirs(prefix, exist_ok=True)
fns.append(fs.sep.join([subdir, filename]))
4 changes: 2 additions & 2 deletions merlin/io/hugectr.py
Original file line number Diff line number Diff line change
@@ -47,9 +47,9 @@ def _write_table(self, idx, data):
nnz = np.intc(1)
np_cats = data[self.cats].to_pandas().astype(np.uintc).to_numpy()
# Write all the data samples
for i, _ in enumerate(np_label):
for i, label in enumerate(np_label):
# Write Label
self.data_writers[idx].write(np_label[i].tobytes())
self.data_writers[idx].write(label.tobytes())
# Write conts (HugeCTR: dense)
self.data_writers[idx].write(np_conts[i].tobytes())
# Write cats (HugeCTR: Slots)
10 changes: 5 additions & 5 deletions merlin/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1029,7 +1029,7 @@ def _write_thread(self):
self.queue.task_done()

@classmethod
def write_special_metadata(cls, md, fs, out_dir):
def write_special_metadata(cls, data, fs, out_dir):
"""Write global _metadata file"""
raise (NotImplementedError)

@@ -1071,10 +1071,10 @@ def _write_table(self, idx, data):
writer.write_table(data)

@classmethod
def write_special_metadata(cls, md, fs, out_dir):
def write_special_metadata(cls, data, fs, out_dir):
# Sort metadata by file name and convert list of
# tuples to a list of metadata byte-blobs
md_list = [m[1] for m in sorted(list(md.items()), key=lambda x: natural_sort_key(x[0]))]
md_list = [m[1] for m in sorted(list(data.items()), key=lambda x: natural_sort_key(x[0]))]

# Aggregate metadata and write _metadata file
_write_pq_metadata_file_cudf(md_list, fs, out_dir)
@@ -1143,10 +1143,10 @@ def _write_table(self, idx, data):
writer.write_table(table, row_group_size=self._get_row_group_size(data))

@classmethod
def write_special_metadata(cls, md, fs, out_dir):
def write_special_metadata(cls, data, fs, out_dir):
# Sort metadata by file name and convert list of
# tuples to a list of metadata byte-blobs
md_list = [m[1] for m in sorted(list(md.items()), key=lambda x: natural_sort_key(x[0]))]
md_list = [m[1] for m in sorted(list(data.items()), key=lambda x: natural_sort_key(x[0]))]

# Aggregate metadata and write _metadata file
_write_pq_metadata_file_pyarrow(md_list, fs, out_dir)
4 changes: 2 additions & 2 deletions merlin/io/worker.py
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ def _get_worker_cache(name):
except ValueError:
# There is no dask.distributed worker.
# Assume client/worker are same process
global _WORKER_CACHE # pylint: disable=global-statement
global _WORKER_CACHE # pylint: disable=global-variable-not-assigned
if name not in _WORKER_CACHE:
_WORKER_CACHE[name] = {}
return _WORKER_CACHE[name]
@@ -124,7 +124,7 @@ def clean_worker_cache(name=None):
worker = get_worker()
except ValueError:
global _WORKER_CACHE # pylint: disable=global-statement
if _WORKER_CACHE != {}:
if _WORKER_CACHE:
if name:
del _WORKER_CACHE[name]
else:
9 changes: 7 additions & 2 deletions merlin/schema/schema.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,11 @@ class ColumnQuantity(Enum):

@dataclass(frozen=True)
class Domain:
"""Describes an integer or float domain.
Can be partially specified. With any of name, min, max.
"""

min: Optional[Union[int, float]] = None
max: Optional[Union[int, float]] = None
name: Optional[str] = None
@@ -55,7 +60,7 @@ class ColumnSchema:
"""A schema containing metadata of a dataframe column."""

name: Text
tags: Optional[TagSet] = field(default_factory=TagSet)
tags: Optional[Union[TagSet, List[Union[str, Tags]]]] = field(default_factory=TagSet)
properties: Optional[Dict] = field(default_factory=dict)
dtype: Optional[object] = None
is_list: Optional[bool] = None
@@ -179,7 +184,7 @@ def with_tags(self, tags: Union[str, Tags]) -> "ColumnSchema":
"""
return ColumnSchema(
self.name,
tags=self.tags.override(tags),
tags=self.tags.override(tags), # type: ignore
properties=self.properties,
dtype=self.dtype,
is_list=self.is_list,
2 changes: 1 addition & 1 deletion merlin/schema/tags.py
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ def _normalize_tags(self, tags) -> Set[Tags]:

for tag in tag_set:
atomized_tags.add(tag)
if tag in COMPOUND_TAGS.keys():
if tag in COMPOUND_TAGS:
warnings.warn(
f"Compound tags like {tag} have been deprecated "
"and will be removed in a future version. "
7 changes: 7 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[mypy]
python_version = 3.8
warn_unused_configs = True
exclude = versioneer.py
ignore_missing_imports = True
show_traceback = True
strict_optional = False
8 changes: 0 additions & 8 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
# packages necessary to run tests and push PRs
# assumes requirements for merlin-core are already installed

black==22.3.0
click<8.1.0
flake8==3.9.2
isort==5.9.3
pylint==2.7.4
bandit==1.7.0
interrogate==1.5.0
pytest>=5
pytest-cov>=2
pytest-xdist
codespell
12 changes: 0 additions & 12 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -112,18 +112,6 @@ commands =
; this runs the tests then removes the Merlin repo directory whether the tests work or fail
python -m pytest Merlin-{env:GIT_COMMIT}/tests/unit

[testenv:lint]
; Runs in: Github Actions
; Runs all lint/code checks and fails the PR if there are errors.
; Install pre-commit-hooks to run these tests during development.
deps = -rrequirements-dev.txt
commands =
flake8 setup.py merlin/ tests/
black --check --diff merlin tests
pylint merlin tests
isort -c merlin tests --skip .tox
interrogate merlin tests --config=pyproject.toml
codespell merlin tests --skip .tox

[testenv:docs]
; Runs in: Github Actions
1 change: 1 addition & 0 deletions versioneer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: skip-file
# Version: 0.23

"""The Versioneer - like a rocketeer, but for versions.