diff --git a/.github/workflows/cpu-ci.yml b/.github/workflows/cpu-ci.yml index 9eb426219..e8526221d 100644 --- a/.github/workflows/cpu-ci.yml +++ b/.github/workflows/cpu-ci.yml @@ -32,9 +32,6 @@ jobs: - name: Install and upgrade python packages run: | python -m pip install --upgrade pip setuptools==59.4.0 wheel tox - - name: Lint with flake8, black, isort, interrogate, codespell - run: | - tox -e lint - name: Run tests run: | ref_type=${{ github.ref_type }} diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 000000000..9159e133a --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,16 @@ +name: lint + +on: + pull_request: + push: + branches: [main] + tags: + - v* + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + - uses: pre-commit/action@v3.0.0 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9e4e1fa96..f19f8c1bd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,30 +5,30 @@ repos: hooks: - id: absolufy-imports - repo: https://github.com/timothycrosley/isort - rev: 5.10.1 + rev: 5.11.2 hooks: - id: isort additional_dependencies: [toml] - exclude: examples/* + exclude: ^examples/ # types - repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v0.971' + rev: 'v0.991' hooks: - id: mypy language_version: python3 - args: [--no-strict-optional, --ignore-missing-imports, --show-traceback, --install-types, --non-interactive] - exclude: docs/* + args: [--non-interactive, --install-types, --namespace-packages, --explicit-package-bases] + exclude: ^docs/ # code style - repo: https://github.com/python/black - rev: 22.6.0 + rev: 22.12.0 hooks: - id: black - repo: https://github.com/pycqa/pylint - rev: v2.14.5 + rev: v2.15.8 hooks: - id: pylint - - repo: https://gitlab.com/pycqa/flake8 - rev: 3.9.2 + - repo: https://github.com/pycqa/flake8 + rev: 6.0.0 hooks: - id: flake8 # notebooks @@ -45,7 +45,7 @@ repos: exclude: ^(build|docs|merlin/io|tests|setup.py|versioneer.py) args: [--config=pyproject.toml] - repo: https://github.com/codespell-project/codespell - rev: v2.2.1 + rev: v2.2.2 hooks: - id: codespell # security diff --git a/.pylintrc b/.pylintrc index 19889ca09..73014f4e4 100644 --- a/.pylintrc +++ b/.pylintrc @@ -22,7 +22,6 @@ disable=fixme, # we'll probably never enable these checks invalid-name, import-error, - no-self-use, # disable code-complexity checks for now # TODO: should we configure the thresholds for these rather than just disable? diff --git a/merlin/core/dispatch.py b/merlin/core/dispatch.py index fc4ec6529..6d63da4e3 100644 --- a/merlin/core/dispatch.py +++ b/merlin/core/dispatch.py @@ -27,16 +27,16 @@ from merlin.core.compat import HAS_GPU from merlin.core.protocols import DataFrameLike, DictLike, SeriesLike -cp = None cudf = None +cp = None rmm = None if HAS_GPU: try: - import cudf - import cupy as cp + import cudf # type: ignore[no-redef] + import cupy as cp # type: ignore[no-redef] import dask_cudf - import rmm + import rmm # type: ignore[no-redef] from cudf.core.column import as_column, build_column try: @@ -285,7 +285,7 @@ def list_val_dtype(ser: SeriesLike) -> np.dtype: The dtype of the innermost elements """ if is_list_dtype(ser): - if HAS_GPU and isinstance(ser, cudf.Series): + if cudf is not None and isinstance(ser, cudf.Series): if is_list_dtype(ser): ser = ser.list.leaves return ser.dtype @@ -386,7 +386,14 @@ def read_dispatch(df: DataFrameLike = None, cpu=None, collection=False, fmt="par if cpu or isinstance(df, pd.DataFrame) or not HAS_GPU: _mod = dd if collection else pd else: - _mod = dask_cudf if collection else cudf.io + if collection: + _mod = dask_cudf + elif cudf is not None: + _mod = cudf.io + else: + raise ValueError( + "Unable to load cudf. Please check your environment GPU and cudf available." + ) _attr = "read_csv" if fmt == "csv" else "read_parquet" return getattr(_mod, _attr) @@ -405,8 +412,10 @@ def parquet_writer_dispatch(df: DataFrameLike, path=None, **kwargs): _cls = pq.ParquetWriter if path: _args.append(pa.Table.from_pandas(df, preserve_index=False).schema) - else: + elif cudf is not None: _cls = cudf.io.parquet.ParquetWriter + else: + ValueError("Unable to load cudf. Please check your environment GPU and cudf available.") if not path: return _cls diff --git a/merlin/core/utils.py b/merlin/core/utils.py index 5cdf53e92..e43253da9 100644 --- a/merlin/core/utils.py +++ b/merlin/core/utils.py @@ -22,8 +22,10 @@ import warnings import zipfile from contextvars import ContextVar +from typing import Any, Callable, Optional import dask +import distributed from dask.dataframe.optimize import optimize as dd_optimize from dask.distributed import Client, get_client from tqdm import tqdm @@ -40,6 +42,25 @@ def pynvml_mem_size(kind="total", index=0): + """Get Memory Info for device. + + Parameters + ---------- + kind : str, optional + Either "free" or "total", by default "total" + index : int, optional + Device Index, by default 0 + + Returns + ------- + int + Either free or total memory on device depending on the kind parameter. + + Raises + ------ + ValueError + When kind is not one of {"free", "total"} + """ import pynvml pynvml.nvmlInit() @@ -49,13 +70,31 @@ def pynvml_mem_size(kind="total", index=0): elif kind == "total": size = int(pynvml.nvmlDeviceGetMemoryInfo(pynvml.nvmlDeviceGetHandleByIndex(index)).total) else: - raise ValueError("{0} not a supported option for device_mem_size.".format(kind)) + raise ValueError(f"{kind} not a supported option for device_mem_size.") pynvml.nvmlShutdown() return size def device_mem_size(kind="total", cpu=False): + """Get Memory Info for either CPU or GPU. + Parameters + ---------- + kind : str, optional + Either "total" or "free", by default "total" + cpu : bool, optional + Specifies whether to check memory for CPU or GPU, by default False + + Returns + ------- + int + Free or total memory on device + + Raises + ------ + ValueError + When kind is provided with an unsupported value. + """ # Use psutil (if available) for cpu mode if cpu and psutil: if kind == "total": @@ -68,7 +107,7 @@ def device_mem_size(kind="total", cpu=False): return int(1e9) if kind not in ["free", "total"]: - raise ValueError("{0} not a supported option for device_mem_size.".format(kind)) + raise ValueError(f"{kind} not a supported option for device_mem_size.") try: if kind == "free": return int(cuda.current_context().get_memory_info()[0]) @@ -79,7 +118,7 @@ def device_mem_size(kind="total", cpu=False): # Not using NVML "free" memory, because it will not include RMM-managed memory warnings.warn("get_memory_info is not supported. Using total device memory from NVML.") size = pynvml_mem_size(kind="total", index=0) - return size + return size def get_rmm_size(size): @@ -129,18 +168,39 @@ def report(chunk, chunksize, total): def ensure_optimize_dataframe_graph(ddf=None, dsk=None, keys=None): - # Perform HLG DataFrame optimizations - # - # If `ddf` is specified, an optimized Dataframe - # collection will be returned. If `dsk` and `keys` - # are specified, an optimized graph will be returned. - # - # These optimizations are performed automatically - # when a DataFrame collection is computed/persisted, - # but they are NOT always performed when statistics - # are computed. The purpose of this utility is to - # ensure that the Dataframe-based optimizations are - # always applied. + """Perform HLG DataFrame optimizations + + If `ddf` is specified, an optimized Dataframe + collection will be returned. If `dsk` and `keys` + are specified, an optimized graph will be returned. + + These optimizations are performed automatically + when a DataFrame collection is computed/persisted, + but they are NOT always performed when statistics + are computed. The purpose of this utility is to + ensure that the Dataframe-based optimizations are + always applied. + + Parameters + ---------- + ddf : dask_cudf.DataFrame, optional + The dataframe to optimize, by default None + dsk : dask.highlevelgraph.HighLevelGraph, optional + Dask high level graph, by default None + keys : List[str], optional + The keys to optimize, by default None + + Returns + ------- + Union[dask_cudf.DataFrame, dask.highlevelgraph.HighLevelGraph] + A dask_cudf DataFrame or dask HighLevelGraph depending + on the parameters provided. + + Raises + ------ + ValueError + If ddf is not provided and one of dsk or keys are None. + """ if ddf is None: if dsk is None or keys is None: @@ -394,7 +454,8 @@ def set_client_deprecated(client, caller_str): def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_options): - """Set the Dask-Distributed client + """Set the Dask-Distributed client. + Parameters ----------- client : {"auto", None} or `dask.distributed.Client` @@ -454,11 +515,18 @@ def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_ return None if active == "auto" else active -def global_dask_client(): +def global_dask_client() -> Optional[distributed.Client]: + """Get Global Dask client if it's been set. + + Returns + ------- + Optional[distributed.Client] + The global client. + """ # First, check _merlin_dask_client merlin_client = _merlin_dask_client.get() if merlin_client and merlin_client != "auto": - if merlin_client.cluster and merlin_client.cluster.workers: + if merlin_client.cluster and merlin_client.cluster.workers: # type: ignore # Active Dask client already known return merlin_client else: @@ -471,14 +539,27 @@ def global_dask_client(): set_dask_client(get_client()) return _merlin_dask_client.get() except ValueError: + # no global client found pass # Catch-all return None -def run_on_worker(func, *args, **kwargs): - # Run a function on a Dask worker using `delayed` - # execution (if a Dask client is detected) +def run_on_worker(func: Callable, *args, **kwargs) -> Any: + """Run a function on a Dask worker using `delayed` + execution (if a Dask client is detected) + + Parameters + ---------- + func : Callable + The function to run + + Returns + ------- + Any + The result of the function call with supplied arguments + """ + if global_dask_client(): # There is a specified or global Dask client. Use it return dask.delayed(func)(*args, **kwargs).compute() diff --git a/merlin/dag/base_operator.py b/merlin/dag/base_operator.py index ee59eedf3..85e96117a 100644 --- a/merlin/dag/base_operator.py +++ b/merlin/dag/base_operator.py @@ -186,7 +186,6 @@ def validate_schemas( strict_dtypes : Boolean, optional Enables strict checking for column dtype matching if True, by default False """ - ... def transform( self, col_selector: ColumnSelector, transformable: Transformable @@ -313,8 +312,8 @@ def output_column_names(self, col_selector: ColumnSelector) -> ColumnSelector: @property def dependencies(self) -> List[Union[str, Any]]: - """Defines an optional list of column dependencies for this operator. This lets you consume columns - that aren't part of the main transformation workflow. + """Defines an optional list of column dependencies for this operator. + This lets you consume columns that aren't part of the main transformation workflow. Returns ------- diff --git a/merlin/dag/node.py b/merlin/dag/node.py index f7fd4551a..10a226b9f 100644 --- a/merlin/dag/node.py +++ b/merlin/dag/node.py @@ -67,7 +67,14 @@ def selector(self, sel): # These methods must maintain grouping def add_dependency( - self, dep: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]] + self, + dep: Union[ + str, + List[str], + ColumnSelector, + "Node", + List[Union[str, List[str], "Node", ColumnSelector]], + ], ): """ Adding a dependency node to this node @@ -90,7 +97,14 @@ def add_dependency( self.dependencies.append(dep_node) def add_parent( - self, parent: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]] + self, + parent: Union[ + str, + List[str], + ColumnSelector, + "Node", + List[Union[str, List[str], "Node", ColumnSelector]], + ], ): """ Adding a parent node to this node @@ -111,7 +125,14 @@ def add_parent( self.parents.extend(parent_nodes) def add_child( - self, child: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]] + self, + child: Union[ + str, + List[str], + ColumnSelector, + "Node", + List[Union[str, List[str], "Node", ColumnSelector]], + ], ): """ Adding a child node to this node @@ -132,7 +153,14 @@ def add_child( self.children.extend(child_nodes) def remove_child( - self, child: Union[str, ColumnSelector, "Node", List[Union[str, "Node", ColumnSelector]]] + self, + child: Union[ + str, + List[str], + ColumnSelector, + "Node", + List[Union[str, List[str], "Node", ColumnSelector]], + ], ): """ Removing a child node from this node diff --git a/merlin/dag/selector.py b/merlin/dag/selector.py index 507d05c6f..6de203d28 100644 --- a/merlin/dag/selector.py +++ b/merlin/dag/selector.py @@ -38,7 +38,7 @@ class ColumnSelector: def __init__( self, - names: List[str] = None, + names: Union[str, List[str]] = None, subgroups: List["ColumnSelector"] = None, tags: List[Union[Tags, str]] = None, ): diff --git a/merlin/io/avro.py b/merlin/io/avro.py index 97341e07c..ce4bcefca 100644 --- a/merlin/io/avro.py +++ b/merlin/io/avro.py @@ -33,7 +33,7 @@ class AvroDatasetEngine(DatasetEngine): def __init__(self, paths, part_size, storage_options=None, cpu=False, **kwargs): # pylint: disable=access-member-before-definition super().__init__(paths, part_size, storage_options=storage_options, cpu=cpu) - if kwargs != {}: + if kwargs: raise ValueError("Unexpected AvroDatasetEngine argument(s).") self.blocksize = part_size diff --git a/merlin/io/dask.py b/merlin/io/dask.py index 86277eca3..9d1ece6b6 100644 --- a/merlin/io/dask.py +++ b/merlin/io/dask.py @@ -110,12 +110,7 @@ def _get_partition_groups(df, partition_cols, fs, output_path, filename): keys = tuple(sub_df[col].iloc[0] for col in partition_cols) if not isinstance(keys, tuple): keys = (keys,) - subdir = fs.sep.join( - [ - "{colname}={value}".format(colname=name, value=val) - for name, val in zip(partition_cols, keys) - ] - ) + subdir = fs.sep.join([f"{name}={val}" for name, val in zip(partition_cols, keys)]) prefix = fs.sep.join([output_path, subdir]) fs.mkdirs(prefix, exist_ok=True) fns.append(fs.sep.join([subdir, filename])) diff --git a/merlin/io/hugectr.py b/merlin/io/hugectr.py index a0f81cbb3..5d0829f78 100644 --- a/merlin/io/hugectr.py +++ b/merlin/io/hugectr.py @@ -47,9 +47,9 @@ def _write_table(self, idx, data): nnz = np.intc(1) np_cats = data[self.cats].to_pandas().astype(np.uintc).to_numpy() # Write all the data samples - for i, _ in enumerate(np_label): + for i, label in enumerate(np_label): # Write Label - self.data_writers[idx].write(np_label[i].tobytes()) + self.data_writers[idx].write(label.tobytes()) # Write conts (HugeCTR: dense) self.data_writers[idx].write(np_conts[i].tobytes()) # Write cats (HugeCTR: Slots) diff --git a/merlin/io/parquet.py b/merlin/io/parquet.py index 4e02794d9..a46182caa 100644 --- a/merlin/io/parquet.py +++ b/merlin/io/parquet.py @@ -1029,7 +1029,7 @@ def _write_thread(self): self.queue.task_done() @classmethod - def write_special_metadata(cls, md, fs, out_dir): + def write_special_metadata(cls, data, fs, out_dir): """Write global _metadata file""" raise (NotImplementedError) @@ -1071,10 +1071,10 @@ def _write_table(self, idx, data): writer.write_table(data) @classmethod - def write_special_metadata(cls, md, fs, out_dir): + def write_special_metadata(cls, data, fs, out_dir): # Sort metadata by file name and convert list of # tuples to a list of metadata byte-blobs - md_list = [m[1] for m in sorted(list(md.items()), key=lambda x: natural_sort_key(x[0]))] + md_list = [m[1] for m in sorted(list(data.items()), key=lambda x: natural_sort_key(x[0]))] # Aggregate metadata and write _metadata file _write_pq_metadata_file_cudf(md_list, fs, out_dir) @@ -1143,10 +1143,10 @@ def _write_table(self, idx, data): writer.write_table(table, row_group_size=self._get_row_group_size(data)) @classmethod - def write_special_metadata(cls, md, fs, out_dir): + def write_special_metadata(cls, data, fs, out_dir): # Sort metadata by file name and convert list of # tuples to a list of metadata byte-blobs - md_list = [m[1] for m in sorted(list(md.items()), key=lambda x: natural_sort_key(x[0]))] + md_list = [m[1] for m in sorted(list(data.items()), key=lambda x: natural_sort_key(x[0]))] # Aggregate metadata and write _metadata file _write_pq_metadata_file_pyarrow(md_list, fs, out_dir) diff --git a/merlin/io/worker.py b/merlin/io/worker.py index a09187d86..889c2e3c9 100644 --- a/merlin/io/worker.py +++ b/merlin/io/worker.py @@ -51,7 +51,7 @@ def _get_worker_cache(name): except ValueError: # There is no dask.distributed worker. # Assume client/worker are same process - global _WORKER_CACHE # pylint: disable=global-statement + global _WORKER_CACHE # pylint: disable=global-variable-not-assigned if name not in _WORKER_CACHE: _WORKER_CACHE[name] = {} return _WORKER_CACHE[name] @@ -124,7 +124,7 @@ def clean_worker_cache(name=None): worker = get_worker() except ValueError: global _WORKER_CACHE # pylint: disable=global-statement - if _WORKER_CACHE != {}: + if _WORKER_CACHE: if name: del _WORKER_CACHE[name] else: diff --git a/merlin/schema/schema.py b/merlin/schema/schema.py index df21f6511..2da00edab 100644 --- a/merlin/schema/schema.py +++ b/merlin/schema/schema.py @@ -34,6 +34,11 @@ class ColumnQuantity(Enum): @dataclass(frozen=True) class Domain: + """Describes an integer or float domain. + + Can be partially specified. With any of name, min, max. + """ + min: Optional[Union[int, float]] = None max: Optional[Union[int, float]] = None name: Optional[str] = None @@ -55,7 +60,7 @@ class ColumnSchema: """A schema containing metadata of a dataframe column.""" name: Text - tags: Optional[TagSet] = field(default_factory=TagSet) + tags: Optional[Union[TagSet, List[Union[str, Tags]]]] = field(default_factory=TagSet) properties: Optional[Dict] = field(default_factory=dict) dtype: Optional[object] = None is_list: Optional[bool] = None @@ -179,7 +184,7 @@ def with_tags(self, tags: Union[str, Tags]) -> "ColumnSchema": """ return ColumnSchema( self.name, - tags=self.tags.override(tags), + tags=self.tags.override(tags), # type: ignore properties=self.properties, dtype=self.dtype, is_list=self.is_list, diff --git a/merlin/schema/tags.py b/merlin/schema/tags.py index 058c9b378..dbca5c066 100644 --- a/merlin/schema/tags.py +++ b/merlin/schema/tags.py @@ -144,7 +144,7 @@ def _normalize_tags(self, tags) -> Set[Tags]: for tag in tag_set: atomized_tags.add(tag) - if tag in COMPOUND_TAGS.keys(): + if tag in COMPOUND_TAGS: warnings.warn( f"Compound tags like {tag} have been deprecated " "and will be removed in a future version. " diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 000000000..6df6301ec --- /dev/null +++ b/mypy.ini @@ -0,0 +1,7 @@ +[mypy] +python_version = 3.8 +warn_unused_configs = True +exclude = versioneer.py +ignore_missing_imports = True +show_traceback = True +strict_optional = False \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index b494b09d7..4ea03af57 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,14 +1,6 @@ # packages necessary to run tests and push PRs # assumes requirements for merlin-core are already installed -black==22.3.0 -click<8.1.0 -flake8==3.9.2 -isort==5.9.3 -pylint==2.7.4 -bandit==1.7.0 -interrogate==1.5.0 pytest>=5 pytest-cov>=2 pytest-xdist -codespell diff --git a/tox.ini b/tox.ini index 7b10ad180..bebb554ce 100644 --- a/tox.ini +++ b/tox.ini @@ -112,18 +112,6 @@ commands = ; this runs the tests then removes the Merlin repo directory whether the tests work or fail python -m pytest Merlin-{env:GIT_COMMIT}/tests/unit -[testenv:lint] -; Runs in: Github Actions -; Runs all lint/code checks and fails the PR if there are errors. -; Install pre-commit-hooks to run these tests during development. -deps = -rrequirements-dev.txt -commands = - flake8 setup.py merlin/ tests/ - black --check --diff merlin tests - pylint merlin tests - isort -c merlin tests --skip .tox - interrogate merlin tests --config=pyproject.toml - codespell merlin tests --skip .tox [testenv:docs] ; Runs in: Github Actions diff --git a/versioneer.py b/versioneer.py index 8db5abd56..e0734502d 100644 --- a/versioneer.py +++ b/versioneer.py @@ -1,3 +1,4 @@ +# pylint: skip-file # Version: 0.23 """The Versioneer - like a rocketeer, but for versions.