Skip to content

Commit

Permalink
Uses new KFP API to create a run using pipeline versions.
Browse files Browse the repository at this point in the history
Existing KFP API doesn't use the updated pipeline, and we should use the new API to create a run for the updated pipeline.
context: kubeflow/pipelines#3418
PiperOrigin-RevId: 304570681
  • Loading branch information
jiyongjung authored and tensorflow-extended-team committed Apr 8, 2020
1 parent 30a57c2 commit c74ef6b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 24 deletions.
2 changes: 1 addition & 1 deletion tfx/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def make_required_test_packages():
return [
'apache-airflow>=1.10,<2',
# LINT.IfChange
'kfp>=0.3.0,<0.4; python_version >= "3.0"',
'kfp>=0.4.0,<0.5; python_version >= "3.0"',
# LINT.ThenChange(
# testing/github/common.sh,
# testing/github/ubuntu/image/image.sh,
Expand Down
9 changes: 6 additions & 3 deletions tfx/tools/cli/e2e/cli_common_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,22 @@ def testMissingRequiredFlag(self):
result = self.runner.invoke(cli_group,
['pipeline', 'create', '--engine', 'beam'])
self.assertIn('CLI', result.output)
self.assertIn('Missing option "--pipeline_path"', result.output)
self.assertIn('Missing option', result.output)
self.assertIn('--pipeline_path', result.output)

# Missing flag for run create.
result = self.runner.invoke(cli_group,
['run', 'create', '--engine', 'airflow'])
self.assertIn('CLI', result.output)
self.assertIn('Missing option "--pipeline_name"', result.output)
self.assertIn('Missing option', result.output)
self.assertIn('--pipeline_name', result.output)

# Missing flag for run status.
result = self.runner.invoke(
cli_group, ['run', 'status', '--pipeline_name', pipeline_name_1])
self.assertIn('CLI', result.output)
self.assertIn('Missing option "--run_id"', result.output)
self.assertIn('Missing option', result.output)
self.assertIn('--run_id', result.output)


if __name__ == '__main__':
Expand Down
62 changes: 45 additions & 17 deletions tfx/tools/cli/handler/kubeflow_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import subprocess
import sys
import time
from typing import Any, Dict, Optional, Text, Tuple
from typing import Any, Dict, Optional, Text

import click
import kfp
Expand Down Expand Up @@ -114,14 +114,14 @@ def delete_pipeline(self) -> None:

pipeline_name = self.flags_dict[labels.PIPELINE_NAME]
# Check if pipeline exists on server.
pipeline_id, experiment_id = self._get_pipeline_id_and_experiment_id(
pipeline_name)
pipeline_id = self._get_pipeline_id(pipeline_name)
self._client._pipelines_api.get_pipeline(pipeline_id) # pylint: disable=protected-access

# Delete pipeline for kfp server.
self._client._pipelines_api.delete_pipeline(id=pipeline_id) # pylint: disable=protected-access

# Delete experiment from server.
experiment_id = self._get_experiment_id(pipeline_name)
self._client._experiment_api.delete_experiment(experiment_id) # pylint: disable=protected-access

# Path to pipeline folder.
Expand Down Expand Up @@ -155,14 +155,21 @@ def create_run(self) -> None:
pipeline_name = self.flags_dict[labels.PIPELINE_NAME]
experiment_name = pipeline_name

pipeline_id, experiment_id = self._get_pipeline_id_and_experiment_id(
pipeline_name)
pipeline_version_id = self._get_pipeline_version_id(pipeline_name)
experiment_id = self._get_experiment_id(pipeline_name)

# Run pipeline.
run = self._client.run_pipeline(
experiment_id=experiment_id,
job_name=experiment_name,
pipeline_id=pipeline_id)
if pipeline_version_id is not None:
run = self._client.run_pipeline(
experiment_id=experiment_id,
job_name=experiment_name,
version_id=pipeline_version_id)
else: # version-less pipelines which are created with tfx <= 0.21.x
pipeline_id = self._get_pipeline_id(pipeline_name)
run = self._client.run_pipeline(
experiment_id=experiment_id,
job_name=experiment_name,
pipeline_id=pipeline_id)

click.echo('Run created for pipeline: ' + pipeline_name)
self._print_runs([run])
Expand All @@ -182,13 +189,13 @@ def list_runs(self) -> None:
"""Lists all runs of a pipeline."""
pipeline_name = self.flags_dict[labels.PIPELINE_NAME]

pipeline_id, experiment_id = self._get_pipeline_id_and_experiment_id(
pipeline_name)
# Check if pipeline exists.
pipeline_id = self._get_pipeline_id(pipeline_name)
self._client._pipelines_api.get_pipeline(pipeline_id) # pylint: disable=protected-access

# List runs.
# TODO(jyzhao): use metadata context to get the run info.
experiment_id = self._get_experiment_id(pipeline_name)
response = self._client.list_runs(experiment_id=experiment_id)

if response and response.runs:
Expand All @@ -214,20 +221,23 @@ def _save_pipeline(self,
pipeline_package_path = self.flags_dict[labels.PIPELINE_PACKAGE_PATH]

if update:
pipeline_id, experiment_id = self._get_pipeline_id_and_experiment_id(
pipeline_name)
pipeline_id = self._get_pipeline_id(pipeline_name)
# A timestamp will be appended for the uniqueness of `version_name`.
version_name = '{}_{}'.format(pipeline_name,
time.strftime('%Y%m%d%H%M%S'))
upload_response = self._client.pipeline_uploads.upload_pipeline_version(
uploadfile=pipeline_package_path,
name=version_name,
pipelineid=pipeline_id)
pipeline_version_id = upload_response.id

experiment_id = self._get_experiment_id(pipeline_name)
else: # creating a new pipeline.
upload_response = self._client.upload_pipeline(
pipeline_package_path=pipeline_package_path,
pipeline_name=pipeline_name)
pipeline_id = upload_response.id
pipeline_version_id = upload_response.default_version.id

# Create experiment with pipeline name as experiment name.
experiment_name = pipeline_name
Expand All @@ -243,6 +253,7 @@ def _save_pipeline(self,
# Add pipeline details to pipeline_args.
pipeline_args[labels.PIPELINE_NAME] = pipeline_name
pipeline_args[labels.PIPELINE_ID] = pipeline_id
pipeline_args[labels.PIPELINE_VERSION_ID] = pipeline_version_id
pipeline_args[labels.PIPELINE_PACKAGE_PATH] = pipeline_package_path
pipeline_args[labels.EXPERIMENT_ID] = experiment_id

Expand Down Expand Up @@ -277,8 +288,8 @@ def _build_pipeline_image(self,
base_image=base_image,
skaffold_cmd=skaffold_cmd).build()

def _get_pipeline_id_and_experiment_id(
self, pipeline_name: Text) -> Tuple[Text, Text]:
def _get_pipeline_args(self, pipeline_name: Text,
arg_name: Text) -> Optional[Text]:
# Path to pipeline folder.
handler_pipeline_path = os.path.join(self._handler_home_dir, pipeline_name)

Expand All @@ -291,8 +302,25 @@ def _get_pipeline_id_and_experiment_id(
# Get pipeline_id/experiment_id from pipeline_args.json
with open(pipeline_args_path, 'r') as f:
pipeline_args = json.load(f)
return (pipeline_args[labels.PIPELINE_ID],
pipeline_args[labels.EXPERIMENT_ID])
return pipeline_args.get(arg_name)

def _get_pipeline_id(self, pipeline_name: Text) -> Text:
pipeline_id = self._get_pipeline_args(pipeline_name, labels.PIPELINE_ID)
if pipeline_id is None:
raise ValueError(
'Cannot find pipeline id for pipeline {}.'.format(pipeline_name))
return pipeline_id

# NOTE: _get_pipeline_version_id is Optional for backward-compatibility.
def _get_pipeline_version_id(self, pipeline_name: Text) -> Optional[Text]:
return self._get_pipeline_args(pipeline_name, labels.PIPELINE_VERSION_ID)

def _get_experiment_id(self, pipeline_name: Text) -> Text:
experiment_id = self._get_pipeline_args(pipeline_name, labels.EXPERIMENT_ID)
if experiment_id is None:
raise ValueError(
'Cannot find experiment id for pipeline {}.'.format(pipeline_name))
return experiment_id

def _print_runs(self, runs):
"""Prints runs in a tabular format with headers mentioned below."""
Expand Down
24 changes: 21 additions & 3 deletions tfx/tools/cli/handler/kubeflow_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def _MockSubprocess(cmd, env): # pylint: disable=invalid-name, unused-argument
return 0


class _MockDefaultVersion(object):

def __init__(self, _id):
self.id = _id


class _MockUploadResponse(object):
"""Mock upload response object."""

Expand All @@ -62,6 +68,7 @@ def __init__(self, config):
self.namespace = config['namespace']
self.id = config['id']
self.name = config['name']
self.default_version = _MockDefaultVersion(config['pipeline_version_id'])


class _MockClientClass(object):
Expand All @@ -73,12 +80,14 @@ def __init__(self, host, client_id, namespace):
'client_id': client_id,
'namespace': namespace,
'id': 'fake_pipeline_id',
'pipeline_version_id': 'fake_pipeline_version_id',
'name': 'fake_pipeline_name'
} # pylint: disable=invalid-name, unused-variable
self._pipelines_api = _MockPipelineApi()
self._experiment_api = _MockExperimentApi()
self._run_api = _MockRunApi()
self.pipeline_uploads = _MockPipielineUploadApi()
self.pipeline_uploads = _MockPipielineUploadApi(
self.config['pipeline_version_id'])

def upload_pipeline(self, pipeline_package_path, pipeline_name): # pylint: disable=invalid-name, unused-argument
return _MockUploadResponse(self.config)
Expand All @@ -89,7 +98,12 @@ def create_experiment(self, name):
def get_experiment(self, experiment_id=None, experiment_name=None): # pylint: disable=unused-argument
return self._experiment_api.get_experiment(experiment_id)

def run_pipeline(self, experiment_id, job_name, pipeline_id=None): # pylint: disable=unused-argument
def run_pipeline(self,
experiment_id,
job_name,
pipeline_id=None,
version_id=None):
del experiment_id, job_name, pipeline_id, version_id
return self._pipelines_api.run_pipeline()

def list_pipelines(self):
Expand Down Expand Up @@ -122,8 +136,12 @@ def run_pipeline(self):

class _MockPipielineUploadApi(object):

def __init__(self, _id):
self.id = _id

def upload_pipeline_version(self, uploadfile, name, pipelineid):
pass
del uploadfile, name, pipelineid
return _MockDefaultVersion(self.id)


class _MockExperimentResponse(object):
Expand Down
2 changes: 2 additions & 0 deletions tfx/tools/cli/labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
NAMESPACE = 'namespace'
# Pipeline id generated when pipeline is uploaded to KFP server.
PIPELINE_ID = 'pipeline_id'
# Pipeline version id generated when pipeline is updated.
PIPELINE_VERSION_ID = 'pipeline_version_id'
# Experiment id generated when a new experiment is created on KFP server.
EXPERIMENT_ID = 'experiment_id'
# Environment variable for the default Kubeflow TFX image.
Expand Down

0 comments on commit c74ef6b

Please sign in to comment.