diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index d8e85a39b..51179b284 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -242,51 +242,56 @@ def compile( pipeline: the pipeline to compile output_path: the path where to save the Kubeflow pipeline spec """ - self.pipeline = pipeline - self.pipeline.validate(run_id="{{workflow.name}}") - logger.info(f"Compiling {self.pipeline.name} to {output_path}") - wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kfp_pipeline) # type: ignore - self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore - logger.info("Pipeline compiled successfully") - def kfp_pipeline(self): - previous_component_task = None - manifest_path = "" - for component_name, component in self.pipeline._graph.items(): - logger.info(f"Compiling service for {component_name}") + @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) + def kfp_pipeline(): + previous_component_task = None + manifest_path = "" + for component_name, component in self.pipeline._graph.items(): + logger.info(f"Compiling service for {component_name}") + + component_op = component["fondant_component_op"] + # convert ComponentOp to Kubeflow component + kubeflow_component_op = self.kfp.components.load_component( + text=component_op.component_spec.kubeflow_specification.to_string(), + ) - component_op = component["fondant_component_op"] - # convert ComponentOp to Kubeflow component - kubeflow_component_op = self.kfp.components.load_component( - text=component_op.component_spec.kubeflow_specification.to_string(), - ) + # Execute the Kubeflow component and pass in the output manifest path from + # the previous component. + component_args = component_op.arguments + metadata = json.dumps( + { + "base_path": self.pipeline.base_path, + "run_id": "{{workflow.name}}", + }, + ) - # Execute the Kubeflow component and pass in the output manifest path from - # the previous component. - component_args = component_op.arguments - metadata = json.dumps( - {"base_path": self.pipeline.base_path, "run_id": "{{workflow.name}}"}, - ) + component_task = kubeflow_component_op( + input_manifest_path=manifest_path, + metadata=metadata, + **component_args, + ) + # Set optional configurations + component_task = self._set_configuration( + component_task, + component_op, + ) + # Set the execution order of the component task to be after the previous + # component task. + if previous_component_task is not None: + component_task.after(previous_component_task) - component_task = kubeflow_component_op( - input_manifest_path=manifest_path, - metadata=metadata, - **component_args, - ) - # Set optional configurations - component_task = self._set_configuration( - component_task, - component_op, - ) - # Set the execution order of the component task to be after the previous - # component task. - if previous_component_task is not None: - component_task.after(previous_component_task) + # Update the manifest path to be the output path of the current component task. + manifest_path = component_task.outputs["output_manifest_path"] - # Update the manifest path to be the output path of the current component task. - manifest_path = component_task.outputs["output_manifest_path"] + previous_component_task = component_task - previous_component_task = component_task + self.pipeline = pipeline + self.pipeline.validate(run_id="{{workflow.name}}") + logger.info(f"Compiling {self.pipeline.name} to {output_path}") + + self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore + logger.info("Pipeline compiled successfully") def _set_configuration(self, task, fondant_component_operation): # Unpack optional specifications diff --git a/src/fondant/runner.py b/src/fondant/runner.py index a5311f7dc..75b75aee4 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -1,7 +1,10 @@ +import json import logging import subprocess # nosec from abc import ABC, abstractmethod +import yaml + logger = logging.getLogger(__name__) @@ -29,3 +32,59 @@ def run(self, input_spec: str, *args, **kwargs): ] subprocess.call(cmd) # nosec + + +class KubeflowRunner(Runner): + def __init__(self, host: str): + self._resolve_imports() + self.host = host + self.client = self.kfp.Client(host=host) + + def _resolve_imports(self): + """Resolve imports for the Kubeflow compiler.""" + try: + import kfp + + self.kfp = kfp + except ImportError: + msg = """You need to install kfp to use the Kubeflow compiler,\n + you can install it with `pip install --extras pipelines`""" + raise ImportError( + msg, + ) + + def run( + self, + input_spec: str, + experiment_name: str = "Default", + *args, + **kwargs, + ): + """Run a kubeflow pipeline.""" + try: + experiment = self.client.get_experiment(experiment_name=experiment_name) + except ValueError: + logger.info( + f"Defined experiment '{experiment_name}' not found. Creating new experiment" + f" under this name", + ) + experiment = self.client.create_experiment(experiment_name) + + job_name = self.get_name_from_spec(input_spec) + "_run" + # TODO add logic to see if pipeline exists + runner = self.client.run_pipeline( + experiment_id=experiment.id, + job_name=job_name, + pipeline_package_path=input_spec, + ) + + pipeline_url = f"{self.host}/#/runs/details/{runner.id}" + logger.info(f"Pipeline is running at: {pipeline_url}") + + def get_name_from_spec(self, input_spec: str): + """Get the name of the pipeline from the spec.""" + with open(input_spec) as f: + spec = yaml.safe_load(f) + return json.loads( + spec["metadata"]["annotations"]["pipelines.kubeflow.org/pipeline_spec"], + )["name"] diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index d34a20c9e..acdb7ac69 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -1,65 +1,47 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}' - generateName: kfp-pipeline- - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + generateName: test-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test + pipeline", "name": "test_pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} spec: - arguments: - parameters: [] - entrypoint: kfp-pipeline - serviceAccountName: pipeline-runner + entrypoint: test-pipeline templates: - - container: + - name: first-component + container: args: [] - command: - - python3 - - main.py - - --input_manifest_path - - /tmp/inputs/input_manifest_path/data - - --metadata - - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' - - --component_spec - - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": {"type": - "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' - - --input_partition_rows - - disable - - --storage_args - - a dummy string arg - - --output_manifest_path - - /tmp/outputs/output_manifest_path/data + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": + {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', + --input_partition_rows, disable, --storage_args, a dummy string arg, --output_manifest_path, + /tmp/outputs/output_manifest_path/data] image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: - data: '' + raw: {data: ''} + outputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} metadata: - annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": - "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' - pipelines.kubeflow.org/component_spec: '{"description": "This is an example - component", "implementation": {"container": {"command": ["python3", "main.py", - "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", - {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", + {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, + "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -68,82 +50,46 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}' - labels: - pipelines.kubeflow.org/enable_caching: 'true' - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - name: first-component - outputs: - artifacts: - - name: first-component-output_manifest_path - path: /tmp/outputs/output_manifest_path/data - - dag: - tasks: - - name: first-component - template: first-component - - arguments: - artifacts: - - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' - name: first-component-output_manifest_path - dependencies: - - first-component - name: second-component - template: second-component - - arguments: - artifacts: - - from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}' - name: second-component-output_manifest_path - dependencies: - - second-component - name: third-component - template: third-component - name: kfp-pipeline - - container: + "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"First component\", + \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "disable", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", + "storage_args": "a dummy string arg"}'} + - name: second-component + container: args: [] - command: - - python3 - - main.py - - --input_manifest_path - - /tmp/inputs/input_manifest_path/data - - --metadata - - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' - - --component_spec - - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": - "This is an example component", "image": "example_component:latest", "name": - "Second component", "produces": {"embeddings": {"fields": {"data": {"items": - {"type": "float32"}, "type": "array"}}}}}' - - --input_partition_rows - - '10' - - --storage_args - - a dummy string arg - - --output_manifest_path - - /tmp/outputs/output_manifest_path/data + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": + "This is an example component", "image": "example_component:latest", "name": + "Second component", "produces": {"embeddings": {"fields": {"data": {"items": + {"type": "float32"}, "type": "array"}}}}}', --input_partition_rows, '10', + --storage_args, a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] image: example_component:latest inputs: artifacts: - - name: first-component-output_manifest_path - path: /tmp/inputs/input_manifest_path/data + - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} + outputs: + artifacts: + - {name: second-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} metadata: - annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\": - {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}", - "input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\", - \"run_id\": \"{{workflow.name}}\"}", "storage_args": "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "a02b0189397a2d9318982201f020dbbbe3962427ed150fe58cc69ff508cc68bb"}' - pipelines.kubeflow.org/component_spec: '{"description": "This is an example - component", "implementation": {"container": {"command": ["python3", "main.py", - "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", - {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", + {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, + "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -152,86 +98,89 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "Second component", "outputs": [{"description": "Path to the output manifest", "name": - "output_manifest_path", "type": "String"}]}' - labels: - pipelines.kubeflow.org/enable_caching: 'true' - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - name: second-component - outputs: - artifacts: - - name: second-component-output_manifest_path - path: /tmp/outputs/output_manifest_path/data - - container: + "output_manifest_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "a02b0189397a2d9318982201f020dbbbe3962427ed150fe58cc69ff508cc68bb"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"consumes\": {\"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"Second component\", + \"produces\": {\"embeddings\": {\"fields\": {\"data\": {\"items\": {\"type\": + \"float32\"}, \"type\": \"array\"}}}}}", "input_partition_rows": "10", "metadata": + "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": + "a dummy string arg"}'} + - name: test-pipeline + dag: + tasks: + - {name: first-component, template: first-component} + - name: second-component + template: second-component + dependencies: [first-component] + arguments: + artifacts: + - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} + - name: third-component + template: third-component + dependencies: [second-component] + arguments: + artifacts: + - {name: second-component-output_manifest_path, from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}'} + - name: third-component + container: args: [] - command: - - python3 - - main.py - - --input_manifest_path - - /tmp/inputs/input_manifest_path/data - - --metadata - - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' - - --component_spec - - '{"args": {"some_list": {"description": "Some list", "items": {"type": "int"}, - "type": "list"}, "storage_args": {"description": "Storage arguments", "type": - "str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, - "embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type": "array"}}}, - "images": {"fields": {"data": {"type": "binary"}}}}, "description": "This - is an example component", "image": "example_component:latest", "name": "Third - component", "produces": {"additionalSubsets": false, "images": {"fields": - {"data": {"type": "binary"}}}}}' - - --input_partition_rows - - None - - --storage_args - - a dummy string arg - - --some_list - - '[1, 2, 3]' - - --output_manifest_path - - /tmp/outputs/output_manifest_path/data + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"some_list": {"description": "Some list", "items": {"type": "int"}, + "type": "list"}, "storage_args": {"description": "Storage arguments", "type": + "str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, + "embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type": + "array"}}}, "images": {"fields": {"data": {"type": "binary"}}}}, "description": + "This is an example component", "image": "example_component:latest", "name": + "Third component", "produces": {"additionalSubsets": false, "images": {"fields": + {"data": {"type": "binary"}}}}}', --input_partition_rows, None, --storage_args, + a dummy string arg, --some_list, '[1, 2, 3]', --output_manifest_path, /tmp/outputs/output_manifest_path/data] image: example_component:latest inputs: artifacts: - - name: second-component-output_manifest_path - path: /tmp/inputs/input_manifest_path/data + - {name: second-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} + outputs: + artifacts: + - {name: third-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} metadata: - annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"some_list\": {\"description\": \"Some list\", \"items\": {\"type\": \"int\"}, - \"type\": \"list\"}, \"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"consumes\": {\"captions\": {\"fields\": {\"data\": - {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": {\"data\": {\"items\": - {\"type\": \"float32\"}, \"type\": \"array\"}}}, \"images\": {\"fields\": - {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"This is an example - component\", \"image\": \"example_component:latest\", \"name\": \"Third - component\", \"produces\": {\"additionalSubsets\": false, \"images\": {\"fields\": - {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", - "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "some_list": "[1, 2, 3]", "storage_args": "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "253932349a663809f2ea6fcf63ebd58f963881c6960435269d3fbe3eb17dcf53"}' - pipelines.kubeflow.org/component_spec: '{"description": "This is an example - component", "implementation": {"container": {"command": ["python3", "main.py", - "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", - {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--some_list", {"inputValue": "some_list"}, - "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": - "example_component:latest"}}, "inputs": [{"description": "Path to the input - manifest", "name": "input_manifest_path", "type": "String"}, {"description": - "Metadata arguments containing the run id and base path", "name": "metadata", - "type": "String"}, {"default": "None", "description": "The component specification - as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": - "None", "description": "The number of rows to load per partition. Set to - override the automatic partitioning", "name": "input_partition_rows", "type": - "String"}, {"description": "Storage arguments", "name": "storage_args", - "type": "String"}, {"description": "Some list", "name": "some_list", "type": - "JsonArray"}], "name": "Third component", "outputs": [{"description": "Path - to the output manifest", "name": "output_manifest_path", "type": "String"}]}' labels: - pipelines.kubeflow.org/enable_caching: 'true' pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 pipelines.kubeflow.org/pipeline-sdk-type: kfp - name: third-component - outputs: - artifacts: - - name: third-component-output_manifest_path - path: /tmp/outputs/output_manifest_path/data + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--storage_args", {"inputValue": "storage_args"}, "--some_list", {"inputValue": + "some_list"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], + "image": "example_component:latest"}}, "inputs": [{"description": "Path + to the input manifest", "name": "input_manifest_path", "type": "String"}, + {"description": "Metadata arguments containing the run id and base path", + "name": "metadata", "type": "String"}, {"default": "None", "description": + "The component specification as a dictionary", "name": "component_spec", + "type": "JsonObject"}, {"default": "None", "description": "The number of + rows to load per partition. Set to override the automatic partitioning", + "name": "input_partition_rows", "type": "String"}, {"description": "Storage + arguments", "name": "storage_args", "type": "String"}, {"description": "Some + list", "name": "some_list", "type": "JsonArray"}], "name": "Third component", + "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", + "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "253932349a663809f2ea6fcf63ebd58f963881c6960435269d3fbe3eb17dcf53"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"some_list\": {\"description\": \"Some list\", \"items\": + {\"type\": \"int\"}, \"type\": \"list\"}, \"storage_args\": {\"description\": + \"Storage arguments\", \"type\": \"str\"}}, \"consumes\": {\"captions\": + {\"fields\": {\"data\": {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": + {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, \"description\": + \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", + "some_list": "[1, 2, 3]", "storage_args": "a dummy string arg"}'} + arguments: + parameters: [] + serviceAccountName: pipeline-runner diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index 68ae61bda..67d306139 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -1,65 +1,47 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}' - generateName: kfp-pipeline- - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + generateName: test-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test + pipeline", "name": "test_pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} spec: - arguments: - parameters: [] - entrypoint: kfp-pipeline - serviceAccountName: pipeline-runner + entrypoint: test-pipeline templates: - - container: + - name: first-component + container: args: [] - command: - - python3 - - main.py - - --input_manifest_path - - /tmp/inputs/input_manifest_path/data - - --metadata - - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' - - --component_spec - - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": {"type": - "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' - - --input_partition_rows - - None - - --storage_args - - a dummy string arg - - --output_manifest_path - - /tmp/outputs/output_manifest_path/data + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": + {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', + --input_partition_rows, None, --storage_args, a dummy string arg, --output_manifest_path, + /tmp/outputs/output_manifest_path/data] image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: - data: '' + raw: {data: ''} + outputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} metadata: - annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": - "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' - pipelines.kubeflow.org/component_spec: '{"description": "This is an example - component", "implementation": {"container": {"command": ["python3", "main.py", - "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", - {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", + {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, + "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -68,69 +50,47 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}' - labels: - pipelines.kubeflow.org/enable_caching: 'true' - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - name: first-component - outputs: - artifacts: - - name: first-component-output_manifest_path - path: /tmp/outputs/output_manifest_path/data - - container: + "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"First component\", + \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", + "storage_args": "a dummy string arg"}'} + - name: image-cropping + container: args: [] - command: - - python3 - - main.py - - --input_manifest_path - - /tmp/inputs/input_manifest_path/data - - --metadata - - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' - - --component_spec - - '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold - parameter used for detecting borders. A lower (negative) parameter results - in a more performant border detection, but can cause overcropping. Default - is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding - for the image cropping. The padding is added to all borders of the image.", - "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, - "description": "Component that removes single-colored borders around images - and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", - "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": - "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}' - - --input_partition_rows - - None - - --cropping_threshold - - '0' - - --padding - - '0' - - --output_manifest_path - - /tmp/outputs/output_manifest_path/data + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding + for the image cropping. The padding is added to all borders of the image.", + "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, + "description": "Component that removes single-colored borders around images + and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", + "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": + "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}', + --input_partition_rows, None, --cropping_threshold, '0', --padding, '0', --output_manifest_path, + /tmp/outputs/output_manifest_path/data] image: ghcr.io/ml6team/image_cropping:dev inputs: artifacts: - - name: first-component-output_manifest_path - path: /tmp/inputs/input_manifest_path/data + - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} + outputs: + artifacts: + - {name: image-cropping-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} metadata: - annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"cropping_threshold\": {\"default\": -30, \"description\": \"Threshold - parameter used for detecting borders. A lower (negative) parameter results - in a more performant border detection, but can cause overcropping. Default - is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": - \"Padding for the image cropping. The padding is added to all borders of - the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": - {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that - removes single-colored borders around images and crops them appropriately\", - \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", - \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, - \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", - "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": - \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "padding": "0"}' - pipelines.kubeflow.org/component_ref: '{"digest": "e86f02b6b9cc878b6187e44bb3caf9291c3ce42c1939e19b0a97dacdc78a9d72"}' - pipelines.kubeflow.org/component_spec: '{"description": "Component that removes - single-colored borders around images and crops them appropriately", "implementation": - {"container": {"command": ["python3", "main.py", "--input_manifest_path", + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Component + that removes single-colored borders around images and crops them appropriately", + "implementation": {"container": {"command": ["python3", "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cropping_threshold", {"inputValue": @@ -149,26 +109,31 @@ spec: 10, "description": "Padding for the image cropping. The padding is added to all borders of the image.", "name": "padding", "type": "Integer"}], "name": "Image cropping", "outputs": [{"description": "Path to the output manifest", - "name": "output_manifest_path", "type": "String"}]}' - labels: - pipelines.kubeflow.org/enable_caching: 'true' - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - name: image-cropping - outputs: - artifacts: - - name: image-cropping-output_manifest_path - path: /tmp/outputs/output_manifest_path/data - - dag: + "name": "output_manifest_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "e86f02b6b9cc878b6187e44bb3caf9291c3ce42c1939e19b0a97dacdc78a9d72"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"cropping_threshold\": {\"default\": -30, \"description\": + \"Threshold parameter used for detecting borders. A lower (negative) parameter + results in a more performant border detection, but can cause overcropping. + Default is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": + \"Padding for the image cropping. The padding is added to all borders of + the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that + removes single-colored borders around images and crops them appropriately\", + \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", + \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, + \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", + "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": + \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "padding": "0"}'} + - name: test-pipeline + dag: tasks: - - name: first-component - template: first-component - - arguments: - artifacts: - - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' - name: first-component-output_manifest_path - dependencies: - - first-component - name: image-cropping + - {name: first-component, template: first-component} + - name: image-cropping template: image-cropping - name: kfp-pipeline + dependencies: [first-component] + arguments: + artifacts: + - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} + arguments: + parameters: [] + serviceAccountName: pipeline-runner diff --git a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml index f8a97dae5..8383894d6 100644 --- a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml @@ -1,68 +1,50 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}' - generateName: kfp-pipeline- - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + generateName: test-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test + pipeline", "name": "test_pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} spec: - arguments: - parameters: [] - entrypoint: kfp-pipeline - serviceAccountName: pipeline-runner + entrypoint: test-pipeline templates: - - container: + - name: first-component + container: args: [] - command: - - python3 - - main.py - - --input_manifest_path - - /tmp/inputs/input_manifest_path/data - - --metadata - - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' - - --component_spec - - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": {"type": - "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' - - --input_partition_rows - - None - - --storage_args - - a dummy string arg - - --output_manifest_path - - /tmp/outputs/output_manifest_path/data + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": + {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', + --input_partition_rows, None, --storage_args, a dummy string arg, --output_manifest_path, + /tmp/outputs/output_manifest_path/data] image: example_component:latest resources: - limits: - nvidia.com/gpu: 1 + limits: {nvidia.com/gpu: 1} inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: - data: '' + raw: {data: ''} + outputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + nodeSelector: {node_pool: a_node_pool} metadata: - annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": - "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' - pipelines.kubeflow.org/component_spec: '{"description": "This is an example - component", "implementation": {"container": {"command": ["python3", "main.py", - "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", - {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", + {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, + "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -71,20 +53,19 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}' - labels: - pipelines.kubeflow.org/enable_caching: 'true' - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - name: first-component - nodeSelector: - node_pool: a_node_pool - outputs: - artifacts: - - name: first-component-output_manifest_path - path: /tmp/outputs/output_manifest_path/data - - dag: + "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"First component\", + \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", + "storage_args": "a dummy string arg"}'} + - name: test-pipeline + dag: tasks: - - name: first-component - template: first-component - name: kfp-pipeline + - {name: first-component, template: first-component} + arguments: + parameters: [] + serviceAccountName: pipeline-runner diff --git a/tests/test_runner.py b/tests/test_runner.py index 145c5f16e..975359db3 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,11 +1,17 @@ -from unittest.mock import patch +import sys +from pathlib import Path +from types import SimpleNamespace +from unittest import mock -from fondant.runner import DockerRunner +import pytest +from fondant.runner import DockerRunner, KubeflowRunner + +VALID_PIPELINE = Path("./tests/example_pipelines/compiled_pipeline/") def test_docker_runner(): """Test that the docker runner while mocking subprocess.call.""" - with patch("subprocess.call") as mock_call: + with mock.patch("subprocess.call") as mock_call: DockerRunner().run("some/path") mock_call.assert_called_once_with( [ @@ -20,3 +26,56 @@ def test_docker_runner(): "--remove-orphans", ], ) + + +class MockKfpClient: + def __init__(self, host): + self.host = host + self._experiments = {"Default": SimpleNamespace(id="123")} + + def get_experiment(self, experiment_name): + try: + return self._experiments[experiment_name] + except KeyError: + raise ValueError + + def create_experiment(self, experiment_name): + self._experiments[experiment_name] = SimpleNamespace(id="456") + return self.get_experiment(experiment_name) + + def run_pipeline(self, experiment_id, job_name, pipeline_package_path): + return SimpleNamespace(id="xyz") + + +def test_kubeflow_runner(): + input_spec_path = str(VALID_PIPELINE / "kubeflow_pipeline.yml") + with mock.patch( + "kfp.Client", + new=MockKfpClient, + ): + runner = KubeflowRunner(host="some_host") + runner.run(input_spec=input_spec_path) + + assert runner.client.host == "some_host" + + +def test_kubeflow_runner_new_experiment(): + input_spec_path = str(VALID_PIPELINE / "kubeflow_pipeline.yml") + with mock.patch( + "kfp.Client", + new=MockKfpClient, + ): + runner = KubeflowRunner(host="some_host") + runner.run(input_spec=input_spec_path, experiment_name="NewExperiment") + + +def test_kfp_import(): + """Test that the kfp import throws the correct error.""" + with mock.patch.dict(sys.modules), mock.patch( + "kfp.Client", + new=MockKfpClient, + ): + # remove kfp from the modules + sys.modules["kfp"] = None + with pytest.raises(ImportError): + _ = KubeflowRunner(host="some_host")