Skip to content

Commit

Permalink
Implement kfp runner with tests (#359)
Browse files Browse the repository at this point in the history
PR implementing the Kfp runner.

Had some issues with adding the pipeline name to the kfp spec (solved
now) because we need it to launch a run with a sensible name.

Remarks:
- removed some logic around deleting existing pipelines that we have
(which was not used afaik)
- I tested it on a toy pipeline and was able to push a run on the GCP
kubeflow
- I will implement the CLI (for both the runner and compiler) in a
separate PR
- I will also make a separate PR for cleaning up the kfp imports and
extras.
  • Loading branch information
GeorgesLorre authored Aug 16, 2023
1 parent 3e51dbc commit 06519b8
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 435 deletions.
83 changes: 44 additions & 39 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,51 +242,56 @@ def compile(
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec
"""
self.pipeline = pipeline
self.pipeline.validate(run_id="{{workflow.name}}")
logger.info(f"Compiling {self.pipeline.name} to {output_path}")
wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kfp_pipeline) # type: ignore
self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")

def kfp_pipeline(self):
previous_component_task = None
manifest_path = ""
for component_name, component in self.pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")
@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
manifest_path = ""
for component_name, component in self.pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component(
text=component_op.component_spec.kubeflow_specification.to_string(),
)

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component(
text=component_op.component_spec.kubeflow_specification.to_string(),
)
# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments
metadata = json.dumps(
{
"base_path": self.pipeline.base_path,
"run_id": "{{workflow.name}}",
},
)

# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments
metadata = json.dumps(
{"base_path": self.pipeline.base_path, "run_id": "{{workflow.name}}"},
)
component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata,
**component_args,
)
# Set optional configurations
component_task = self._set_configuration(
component_task,
component_op,
)
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
component_task.after(previous_component_task)

component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata,
**component_args,
)
# Set optional configurations
component_task = self._set_configuration(
component_task,
component_op,
)
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
component_task.after(previous_component_task)
# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]

# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]
previous_component_task = component_task

previous_component_task = component_task
self.pipeline = pipeline
self.pipeline.validate(run_id="{{workflow.name}}")
logger.info(f"Compiling {self.pipeline.name} to {output_path}")

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")

def _set_configuration(self, task, fondant_component_operation):
# Unpack optional specifications
Expand Down
59 changes: 59 additions & 0 deletions src/fondant/runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import logging
import subprocess # nosec
from abc import ABC, abstractmethod

import yaml

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -29,3 +32,59 @@ def run(self, input_spec: str, *args, **kwargs):
]

subprocess.call(cmd) # nosec


class KubeflowRunner(Runner):
def __init__(self, host: str):
self._resolve_imports()
self.host = host
self.client = self.kfp.Client(host=host)

def _resolve_imports(self):
"""Resolve imports for the Kubeflow compiler."""
try:
import kfp

self.kfp = kfp
except ImportError:
msg = """You need to install kfp to use the Kubeflow compiler,\n
you can install it with `pip install --extras pipelines`"""
raise ImportError(
msg,
)

def run(
self,
input_spec: str,
experiment_name: str = "Default",
*args,
**kwargs,
):
"""Run a kubeflow pipeline."""
try:
experiment = self.client.get_experiment(experiment_name=experiment_name)
except ValueError:
logger.info(
f"Defined experiment '{experiment_name}' not found. Creating new experiment"
f" under this name",
)
experiment = self.client.create_experiment(experiment_name)

job_name = self.get_name_from_spec(input_spec) + "_run"
# TODO add logic to see if pipeline exists
runner = self.client.run_pipeline(
experiment_id=experiment.id,
job_name=job_name,
pipeline_package_path=input_spec,
)

pipeline_url = f"{self.host}/#/runs/details/{runner.id}"
logger.info(f"Pipeline is running at: {pipeline_url}")

def get_name_from_spec(self, input_spec: str):
"""Get the name of the pipeline from the spec."""
with open(input_spec) as f:
spec = yaml.safe_load(f)
return json.loads(
spec["metadata"]["annotations"]["pipelines.kubeflow.org/pipeline_spec"],
)["name"]
Loading

0 comments on commit 06519b8

Please sign in to comment.