Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement kfp runner with tests #359

Merged
merged 3 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 44 additions & 39 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,51 +242,56 @@ def compile(
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec
"""
self.pipeline = pipeline
self.pipeline.validate(run_id="{{workflow.name}}")
logger.info(f"Compiling {self.pipeline.name} to {output_path}")
wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kfp_pipeline) # type: ignore
self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")

def kfp_pipeline(self):
previous_component_task = None
manifest_path = ""
for component_name, component in self.pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")
@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
manifest_path = ""
for component_name, component in self.pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component(
text=component_op.component_spec.kubeflow_specification.to_string(),
)

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component(
text=component_op.component_spec.kubeflow_specification.to_string(),
)
# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments
metadata = json.dumps(
{
"base_path": self.pipeline.base_path,
"run_id": "{{workflow.name}}",
},
)

# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments
metadata = json.dumps(
{"base_path": self.pipeline.base_path, "run_id": "{{workflow.name}}"},
)
component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata,
**component_args,
)
# Set optional configurations
component_task = self._set_configuration(
component_task,
component_op,
)
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
component_task.after(previous_component_task)

component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata,
**component_args,
)
# Set optional configurations
component_task = self._set_configuration(
component_task,
component_op,
)
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
component_task.after(previous_component_task)
# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]

# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]
previous_component_task = component_task

previous_component_task = component_task
self.pipeline = pipeline
self.pipeline.validate(run_id="{{workflow.name}}")
logger.info(f"Compiling {self.pipeline.name} to {output_path}")

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")

def _set_configuration(self, task, fondant_component_operation):
# Unpack optional specifications
Expand Down
59 changes: 59 additions & 0 deletions src/fondant/runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import logging
import subprocess # nosec
from abc import ABC, abstractmethod

import yaml

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -29,3 +32,59 @@ def run(self, input_spec: str, *args, **kwargs):
]

subprocess.call(cmd) # nosec


class KubeflowRunner(Runner):
def __init__(self, host: str):
self._resolve_imports()
self.host = host
self.client = self.kfp.Client(host=host)

def _resolve_imports(self):
"""Resolve imports for the Kubeflow compiler."""
try:
import kfp

self.kfp = kfp
except ImportError:
msg = """You need to install kfp to use the Kubeflow compiler,\n
you can install it with `pip install --extras pipelines`"""
raise ImportError(
msg,
)

def run(
self,
input_spec: str,
experiment_name: str = "Default",
*args,
**kwargs,
):
"""Run a kubeflow pipeline."""
try:
experiment = self.client.get_experiment(experiment_name=experiment_name)
except ValueError:
logger.info(
f"Defined experiment '{experiment_name}' not found. Creating new experiment"
f" under this name",
)
experiment = self.client.create_experiment(experiment_name)

job_name = self.get_name_from_spec(input_spec) + "_run"
# TODO add logic to see if pipeline exists
runner = self.client.run_pipeline(
experiment_id=experiment.id,
job_name=job_name,
pipeline_package_path=input_spec,
)

pipeline_url = f"{self.host}/#/runs/details/{runner.id}"
logger.info(f"Pipeline is running at: {pipeline_url}")

def get_name_from_spec(self, input_spec: str):
"""Get the name of the pipeline from the spec."""
with open(input_spec) as f:
spec = yaml.safe_load(f)
return json.loads(
spec["metadata"]["annotations"]["pipelines.kubeflow.org/pipeline_spec"],
)["name"]
Loading