Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX #58 - Move MlflowPipelineHook args to PipelineML class #60

Merged
merged 1 commit into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### Added

- Add dataset ``MlflowMetricsDataSet`` for metrics logging ([#9](https://github.com/Galileo-Galilei/kedro-mlflow/issues/9)) and update documentation for metrics.`
- Add dataset ``MlflowMetricsDataSet`` for metrics logging ([#9](https://github.com/Galileo-Galilei/kedro-mlflow/issues/9)) and update documentation for metrics.

### Fixed

Expand All @@ -14,6 +14,10 @@
- Force to close all mlflow runs when a pipeline fails. It prevents further execution of the pipeline to be logged within the same mlflow run_id as the failing pipeline. ([#10](https://github.com/Galileo-Galilei/kedro-mlflow/issues/10))
- Fix various documentation typos ([#34](https://github.com/Galileo-Galilei/kedro-mlflow/pull/34), [#35](https://github.com/Galileo-Galilei/kedro-mlflow/pull/35), [#36](https://github.com/Galileo-Galilei/kedro-mlflow/pull/36) and more)

### Changed

- Remove `conda_env` and `model_name` arguments from `MlflowPipelineHook` and add them to `PipelineML` and `pipeline_ml`. This is necessary for incoming hook auto-discovery in future release and it enables having multiple `PipelineML` in the same project. [#58](https://github.com/Galileo-Galilei/kedro-mlflow/pull/58)

## [0.2.1] - 2018-08-06

### Added
Expand Down
19 changes: 6 additions & 13 deletions kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@


class MlflowPipelineHook:
def __init__(
self,
conda_env: Union[str, Path, Dict[str, Any]] = None,
model_name: Union[str, None] = "model",
):
self.conda_env = _format_conda_env(conda_env)
self.model_name = model_name

@hook_impl
def after_catalog_created(
self,
Expand Down Expand Up @@ -137,12 +129,12 @@ def after_pipeline_run(
pipeline_catalog = pipeline.extract_pipeline_catalog(catalog)
artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog)
mlflow.pyfunc.log_model(
artifact_path=self.model_name,
artifact_path=pipeline.model_name,
python_model=KedroPipelineModel(
pipeline_ml=pipeline, catalog=pipeline_catalog
),
artifacts=artifacts,
conda_env=self.conda_env,
conda_env=_format_conda_env(pipeline.conda_env),
)
# Close the mlflow active run at the end of the pipeline to avoid interactions with further runs
mlflow.end_run()
Expand Down Expand Up @@ -218,16 +210,17 @@ def _format_conda_env(
"""Best effort to get dependecies of the project.

Keyword Arguments:
conda_env {[type]} -- It can be either :
conda_env {Union[str, Path, Dict[str, Any]]} -- It can be either :
- a path to a "requirements.txt": In this case
the packages are parsed and a conda env with
your current python_version and these dependencies is returned
- a path to an "environment.yml" : data is loaded and used as they are
- a Dict : used as the environment
- None (default: {None})
- None: a base conda environment with your current python version and your project version at training time.
Defaults to None.

Returns:
Dict[str, Any] -- [description]
Dict[str, Any] -- A dictionnary which contains all informations to dump it to a conda environment.yml file.
"""
python_version = ".".join(
[
Expand Down
54 changes: 52 additions & 2 deletions kedro_mlflow/pipeline/modular_pipeline_ml.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,62 @@
from pathlib import Path
from typing import Any, Dict, Optional, Union

from kedro.pipeline import Pipeline

from kedro_mlflow.pipeline.pipeline_ml import PipelineML


def pipeline_ml(
training: Pipeline, inference: Pipeline, input_name: str = None,
training: Pipeline,
inference: Pipeline,
input_name: str = None,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
) -> PipelineML:
"""[summary]

Args:
training (Pipeline): The `Pipeline` object that creates
all mlflow artifacts for prediction (the model,
but also encoders, binarizers, tokenizers...).
These artifacts must be persisted in the catalog.yml.
inference (Pipeline): A `Pipeline` object which will be
stored in mlflow and use the output(s)
of the training pipeline (namely, the model)
to predict the outcome.
input_name (str, optional): The name of the dataset in
the catalog.yml which the model's user must provide
for prediction (i.e. the data). Defaults to None.
conda_env (Union[str, Path, Dict[str, Any]], optional):
The minimal conda environment necessary for the
inference `Pipeline`. It can be either :
- a path to a "requirements.txt": In this case
the packages are parsed and a conda env with
your current python_version and these
dependencies is returned.
- a path to an "environment.yml" : the file is
uploaded "as is".
- a Dict : used as the environment
- None: a base conda environment with your
current python version and your project
version at training time.
Defaults to None.
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".

Returns:
PipelineML: A `PipelineML` which is automatically
discovered by the `MlflowPipelineHook` and
contains all the information for logging the
inference pipeline as a Mlflow Model.
"""

pipeline = PipelineML(
nodes=training.nodes, inference=inference, input_name=input_name
nodes=training.nodes,
inference=inference,
input_name=input_name,
conda_env=conda_env,
model_name=model_name,
)
return pipeline
48 changes: 42 additions & 6 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Callable, Iterable, Union
from typing import Any, Callable, Dict, Iterable, Optional, Union

from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
Expand All @@ -13,6 +13,7 @@ class PipelineML(Pipeline):
IMPORTANT NOTE : THIS CLASS IS NOT INTENDED TO BE USED DIRECTLY IN A KEDRO PROJECT. YOU SHOULD USE
``pipeline_ml`` FUNCTION FOR MODULAR PIPELINE WHICH IS MORE FLEXIBLE AND USER FRIENDLY.
SEE INSERT_DOC_URL

A ``PipelineML`` is a kedro ``Pipeline`` which we assume is a "training" (in the machine learning way)
pipeline. Basically, "training" is a higher order function (it generates another function). It implies that:
- the outputs of this pipeline are considered as "fitted models", i.e. inputs
Expand All @@ -25,23 +26,58 @@ class PipelineML(Pipeline):
in mlflow easily. The goal is to call the ``MLflowPipelineHook`` hook after a PipelineMl is called
in order to trigger mlflow packaging.


Arguments:
Pipeline {[type]} -- [description]
"""

def __init__(
self,
nodes: Iterable[Union[Node, "Pipeline"]],
nodes: Iterable[Union[Node, Pipeline]],
*args,
tags: Union[str, Iterable[str]] = None,
tags: Optional[Union[str, Iterable[str]]] = None,
inference: Pipeline,
input_name: str,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
):

"""Store all necessary information for calling mlflow.log_model in the pipeline.

Args:
nodes (Iterable[Union[Node, Pipeline]]): The `node`s
of the training pipeline.
tags (Union[str, Iterable[str]], optional): Optional
set of tags to be applied to all the pipeline
nodes. Defaults to None.
inference (Pipeline): A `Pipeline` object which will be
stored in mlflow and use the output(s)
of the training pipeline (namely, the model)
to predict the outcome.
input_name (str, optional): The name of the dataset in
the catalog.yml which the model's user must provide
for prediction (i.e. the data). Defaults to None.
conda_env (Union[str, Path, Dict[str, Any]], optional):
The minimal conda environment necessary for the
inference `Pipeline`. It can be either :
- a path to a "requirements.txt": In this case
the packages are parsed and a conda env with
your current python_version and these
dependencies is returned.
- a path to an "environment.yml" : the file is
uploaded "as is".
- a Dict : used as the environment
- None: a base conda environment with your
current python version and your project
version at training time.
Defaults to None.
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".
"""

super().__init__(nodes, *args, tags=tags)

self.inference = inference
self.conda_env = conda_env
self.model_name = model_name

self._check_input_name(input_name)
self.input_name = input_name
Expand Down
10 changes: 5 additions & 5 deletions tests/framework/hooks/test_pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ def env_from_requirements(requirements_path, python_version):

@pytest.fixture
def env_from_dict(python_version):
env_from_dict = dict(
python=python_version, dependencies=["pandas>=1.0.0,<2.0.0", "kedro==0.15.9"]
)
env_from_dict = dict(python=python_version, dependencies=["pandas>=1.0.0,<2.0.0"])
return env_from_dict


Expand Down Expand Up @@ -159,12 +157,14 @@ def predict_fun(model, data):


@pytest.fixture
def dummy_pipeline_ml(dummy_pipeline):
def dummy_pipeline_ml(dummy_pipeline, env_from_dict):

dummy_pipeline_ml = pipeline_ml(
training=dummy_pipeline.only_nodes_with_tags("training"),
inference=dummy_pipeline.only_nodes_with_tags("inference"),
input_name="raw_data",
conda_env=env_from_dict,
model_name="model",
)
return dummy_pipeline_ml

Expand Down Expand Up @@ -242,7 +242,7 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types(
# config_with_base_mlflow_conf is a conftest fixture
mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True)
monkeypatch.chdir(tmp_path)
pipeline_hook = MlflowPipelineHook(conda_env=env_from_dict, model_name="model")
pipeline_hook = MlflowPipelineHook()
runner = SequentialRunner()
pipeline_hook.after_catalog_created(
catalog=dummy_catalog,
Expand Down