Skip to content

Commit

Permalink
chore(sdk): Fix a bug where we missed injecting importer node (#4712)
Browse files Browse the repository at this point in the history
* Fix bug where we missed injecting importer node

* moved files

* address review comments
  • Loading branch information
chensun committed Nov 5, 2020
1 parent 92a932e commit 935a9b5
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 180 deletions.
12 changes: 8 additions & 4 deletions sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from kfp.compiler._k8s_helper import sanitize_k8s_name
from kfp.components import _python_op
from kfp.v2 import dsl
from kfp.v2.compiler import importer_node
from kfp.v2.dsl import importer_node
from kfp.v2.dsl import type_utils
from kfp.v2.proto import pipeline_spec_pb2

Expand Down Expand Up @@ -114,18 +114,22 @@ def _create_pipeline_spec(
# Check if need to insert importer node
for input_name in task.inputs.artifacts:
if not task.inputs.artifacts[input_name].producer_task:
artifact_type = type_utils.get_input_artifact_type_schema(
type_schema = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)

importer_task, importer_spec = importer_node.build_importer_spec(
task, input_name, artifact_type)
importer_task = importer_node.build_importer_task_spec(
dependent_task=task,
input_name=input_name,
input_type_schema=type_schema)
importer_tasks.append(importer_task)

task.inputs.artifacts[
input_name].producer_task = importer_task.task_info.name
task.inputs.artifacts[
input_name].output_artifact_key = importer_node.OUTPUT_KEY

# Retrieve the pre-built importer spec
importer_spec = op.importer_spec[input_name]
deployment_config.executors[
importer_task.executor_label].importer.CopyFrom(importer_spec)

Expand Down
81 changes: 0 additions & 81 deletions sdk/python/kfp/v2/compiler/importer_node_test.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
}
},
"artifacts": {
"input_6": {
"producerTask": "upstream_input_6_importer",
"outputArtifactKey": "result"
},
"input_7": {
"producerTask": "upstream_input_7_importer",
"outputArtifactKey": "result"
Expand All @@ -35,16 +31,20 @@
"producerTask": "upstream_input_5_importer",
"outputArtifactKey": "result"
},
"input_3": {
"producerTask": "upstream_input_3_importer",
"outputArtifactKey": "result"
},
"input_4": {
"producerTask": "upstream_input_4_importer",
"outputArtifactKey": "result"
},
"input_8": {
"producerTask": "upstream_input_8_importer",
"input_6": {
"producerTask": "upstream_input_6_importer",
"outputArtifactKey": "result"
},
"input_3": {
"producerTask": "upstream_input_3_importer",
"input_8": {
"producerTask": "upstream_input_8_importer",
"outputArtifactKey": "result"
}
}
Expand Down Expand Up @@ -84,21 +84,21 @@
}
},
"artifacts": {
"input_c": {
"producerTask": "upstream",
"outputArtifactKey": "output_3"
},
"input_b": {
"producerTask": "upstream",
"outputArtifactKey": "output_2"
},
"input_c": {
"producerTask": "upstream",
"outputArtifactKey": "output_3"
}
}
},
"executorLabel": "downstream"
},
{
"taskInfo": {
"name": "upstream_input_6_importer"
"name": "upstream_input_7_importer"
},
"outputs": {
"artifacts": {
Expand All @@ -109,37 +109,37 @@
}
}
},
"executorLabel": "upstream_input_6_importer"
"executorLabel": "upstream_input_7_importer"
},
{
"taskInfo": {
"name": "upstream_input_7_importer"
"name": "upstream_input_5_importer"
},
"outputs": {
"artifacts": {
"result": {
"artifactType": {
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
"instanceSchema": "title: kfp.Metrics\ntype: object\nproperties:\n"
}
}
}
},
"executorLabel": "upstream_input_7_importer"
"executorLabel": "upstream_input_5_importer"
},
{
"taskInfo": {
"name": "upstream_input_5_importer"
"name": "upstream_input_3_importer"
},
"outputs": {
"artifacts": {
"result": {
"artifactType": {
"instanceSchema": "title: kfp.Metrics\ntype: object\nproperties:\n"
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
}
}
}
},
"executorLabel": "upstream_input_5_importer"
"executorLabel": "upstream_input_3_importer"
},
{
"taskInfo": {
Expand All @@ -158,7 +158,7 @@
},
{
"taskInfo": {
"name": "upstream_input_8_importer"
"name": "upstream_input_6_importer"
},
"outputs": {
"artifacts": {
Expand All @@ -169,11 +169,11 @@
}
}
},
"executorLabel": "upstream_input_8_importer"
"executorLabel": "upstream_input_6_importer"
},
{
"taskInfo": {
"name": "upstream_input_3_importer"
"name": "upstream_input_8_importer"
},
"outputs": {
"artifacts": {
Expand All @@ -184,36 +184,16 @@
}
}
},
"executorLabel": "upstream_input_3_importer"
"executorLabel": "upstream_input_8_importer"
}
],
"deploymentConfig": {
"@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig",
"executors": {
"upstream_input_7_importer": {
"importer": {
"artifactUri": {
"runtimeParameter": "input7"
},
"typeSchema": {
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
}
}
},
"upstream_input_5_importer": {
"importer": {
"artifactUri": {
"runtimeParameter": "input5"
},
"typeSchema": {
"instanceSchema": "title: kfp.Metrics\ntype: object\nproperties:\n"
}
}
},
"upstream_input_8_importer": {
"upstream_input_6_importer": {
"importer": {
"artifactUri": {
"runtimeParameter": "input8"
"runtimeParameter": "input6"
},
"typeSchema": {
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
Expand All @@ -238,16 +218,6 @@
]
}
},
"upstream_input_4_importer": {
"importer": {
"artifactUri": {
"runtimeParameter": "input4"
},
"typeSchema": {
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
}
}
},
"upstream_input_3_importer": {
"importer": {
"artifactUri": {
Expand All @@ -268,10 +238,42 @@
]
}
},
"upstream_input_6_importer": {
"upstream_input_4_importer": {
"importer": {
"artifactUri": {
"runtimeParameter": "input6"
"runtimeParameter": "input4"
},
"typeSchema": {
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
}
}
},
"upstream_input_8_importer": {
"importer": {
"artifactUri": {
"runtimeParameter": "input8"
},
"typeSchema": {
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
}
}
},
"upstream_input_5_importer": {
"importer": {
"artifactUri": {
"constantValue": {
"stringValue": "gs://bucket/metrics"
}
},
"typeSchema": {
"instanceSchema": "title: kfp.Metrics\ntype: object\nproperties:\n"
}
}
},
"upstream_input_7_importer": {
"importer": {
"artifactUri": {
"runtimeParameter": "input7"
},
"typeSchema": {
"instanceSchema": "title: kfp.Artifact\ntype: object\nproperties:\n"
Expand All @@ -280,13 +282,13 @@
}
}
},
"sdkVersion": "kfp-1.0.1-dev20201029",
"sdkVersion": "kfp-1.1.0-alpha.1",
"schemaVersion": "v2alpha1",
"runtimeParameters": {
"input6": {
"input8": {
"type": "STRING",
"defaultValue": {
"stringValue": "gs://bucket/dataset"
"stringValue": "gs://path2"
}
},
"input7": {
Expand All @@ -295,16 +297,10 @@
"stringValue": "arbitrary value"
}
},
"input5": {
"type": "STRING",
"defaultValue": {
"stringValue": "gs://bucket/metrics"
}
},
"input8": {
"input6": {
"type": "STRING",
"defaultValue": {
"stringValue": "gs://path2"
"stringValue": "gs://bucket/dataset"
}
}
}
Expand Down
Loading

0 comments on commit 935a9b5

Please sign in to comment.