Skip to content

Commit

Permalink
SDK: Add PipelineVolumeSnapshots
Browse files Browse the repository at this point in the history
* Add snaps attribute to Pipeline. This is a dict having all the
  PipelineVolumeSnapshots added in the pipeline.
* Extend the processing of the volumes argument in ContainerOp to handle
  PipelineVolumeSnapshots as values.
* Add dsl/pipeline_vsnapshot_tests.py
* Append compiler's templates with the resource templates required
* Extend compiler/compiler_tests.py
* Add samples/volumes/example{4,5}.py

Signed-off-by: Ilias Katsakioris <elikatsis@arrikto.com>
  • Loading branch information
elikatsis committed Mar 6, 2019
1 parent 2382d35 commit f787b13
Show file tree
Hide file tree
Showing 15 changed files with 1,312 additions and 5 deletions.
75 changes: 75 additions & 0 deletions samples/volumes/example4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import kfp.dsl as dsl


@dsl.pipeline(
name="Example 4",
description="The fourth example of the design doc."
)
def example4(url):
vol1 = dsl.PipelineVolume(
name="vol1",
size="1Gi",
mode=dsl.VOLUME_MODE_RWM
)

step1 = dsl.ContainerOp(
name="step1_ingest",
image="google/cloud-sdk:216.0.0",
command=["sh", "-c"],
arguments=["gsutil cat %s | tee /data/file1.gz" % url],
volumes={"/data": vol1}
)

step1_snap = dsl.PipelineVolumeSnapshot(
step1.volume,
name="snap1"
)

vol2 = dsl.PipelineVolume(
data_source=step1_snap,
name="vol2"
)

step2 = dsl.ContainerOp(
name="step2_gunzip",
image="library/bash:4.4.23",
command=["gunzip", "/data/file1.gz"],
volumes={"/data": vol2}
)

step2_snap = dsl.PipelineVolumeSnapshot(
step2.volume,
name="snap2"
)

vol3 = dsl.PipelineVolume(
data_source=step2_snap,
name="vol3"
)

step3 = dsl.ContainerOp(
name="step3_output",
image="library/bash:4.4.23",
command=["cat", "/data/file1"],
volumes={"/data": vol3}
)


if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(example4, __file__ + '.tar.gz')
76 changes: 76 additions & 0 deletions samples/volumes/example5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import kfp.dsl as dsl


@dsl.pipeline(
name="Example 5",
description="The fifth example of the design doc."
)
def example5(rok_url):
vol1 = dsl.PipelineVolume(
name="vol1",
size="1Gi",
annotations={"rok/origin": rok_url},
mode=dsl.VOLUME_MODE_RWM
)

step1 = dsl.ContainerOp(
name="step1_concat",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["cat /data/file*| gzip -c >/data/full.gz"],
volumes={"/data": vol1}
)

step1_snap = dsl.PipelineVolumeSnapshot(
step1.volume,
name="snap1"
)

vol2 = dsl.PipelineVolume(
data_source=step1_snap,
name="vol2"
)

step2 = dsl.ContainerOp(
name="step2_gunzip",
image="library/bash:4.4.23",
command=["gunzip", "-k", "/data/full.gz"],
volumes={"/data": vol2}
)

step2_snap = dsl.PipelineVolumeSnapshot(
step2.volume,
name="snap2"
)

vol3 = dsl.PipelineVolume(
data_source=step2_snap,
name="vol3"
)

step3 = dsl.ContainerOp(
name="step3_output",
image="library/bash:4.4.23",
command=["cat", "/data/full"],
volumes={"/data": vol3}
)


if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(example5, __file__ + '.tar.gz')
34 changes: 34 additions & 0 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,37 @@ def _vol_to_template(self, vol):
template["inputs"] = {"parameters": inputs}
return template

def _snap_to_template(self, snap):
output_name = {
"name": "%s-name" % snap.name,
"valueFrom": {
"jsonPath": "{.metadata.name}"
}
}
output_size = {
"name": "%s-size" % snap.name,
"valueFrom": {
"jsonPath": "{.status.restoreSize}"
}
}
template = dict()
template["name"] = snap.name
template["resource"] = dict()
template["resource"]["action"] = "create"
template["resource"]["successCondition"] = "status.readyToUse == true"
template["resource"]["manifest"] = yaml.dump(snap.k8s_resource,
default_flow_style=False)
template["outputs"] = {"parameters": [output_name, output_size]}
inputs = []
for i in snap.inputs:
if i.op_name is None or i.op_name == "":
inputs.append({"name": "%s" % i.name})
else:
inputs.append({"name": "%s-%s" % (i.op_name, i.name)})
if inputs != []:
template["inputs"] = {"parameters": inputs}
return template

def _create_templates(self, pipeline):
"""Create all groups and ops templates in the pipeline."""

Expand All @@ -455,6 +486,9 @@ def _create_templates(self, pipeline):

for vol in pipeline.vols.values():
templates.append(self._vol_to_template(vol))

for snap in pipeline.snaps.values():
templates.append(self._snap_to_template(snap))
return templates

def _create_volumes(self, pipeline):
Expand Down
1 change: 1 addition & 0 deletions sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ._pipeline import Pipeline, pipeline, get_pipeline_conf
from ._pipeline_volume import (PipelineVolume, VOLUME_MODE_RWO,
VOLUME_MODE_RWM, VOLUME_MODE_ROM)
from ._pipeline_vsnapshot import PipelineVolumeSnapshot
from ._container_op import ContainerOp
from ._ops_group import OpsGroup, ExitHandler, Condition
from ._component import python_component
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from . import _pipeline
from . import _pipeline_param
from . import _pipeline_volume
from . import _pipeline_vsnapshot
from ._pipeline_param import _extract_pipelineparams
from kubernetes import client as k8s_client
import re
Expand Down Expand Up @@ -42,8 +44,8 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
one way for outside world to receive outputs of the container.
is_exit_handler: Whether it is used as an exit handler.
volumes: Dictionary for the user to match a path on the op's fs with a
PipelineVolume.
E.g {"/mnt": other_op.volumes["/output"], "/my/path": vol1}.
PipelineVolume or PipelineVolumeSnapshot.
E.g {"/mnt": other_op.volumes["/output"], "/my/path": snap}.
"""

if not _pipeline.Pipeline.get_default_pipeline():
Expand Down Expand Up @@ -90,6 +92,8 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
self.volumes = {}
if volumes:
for mount_path, volume in volumes.items():
if isinstance(volume, _pipeline_vsnapshot.PipelineVolumeSnapshot):
volume = _pipeline_volume.PipelineVolume(data_source=volume)
if volume.from_snapshot:
self.deps.append(volume.name)
else:
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from . import _container_op
from ._metadata import PipelineMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta
from . import _pipeline_volume
from . import _pipeline_vsnapshot
from . import _ops_group
from ..compiler._k8s_helper import K8sHelper
import sys
Expand Down Expand Up @@ -130,6 +131,7 @@ def __init__(self, name: str):
self.ops = {}
self.cops = {}
self.vols = {}
self.snaps = {}
self.global_params = set()
# Add the root group.
self.groups = [_ops_group.OpsGroup('pipeline', name=name)]
Expand Down Expand Up @@ -169,6 +171,8 @@ def add_op(self, op, define_only: bool):
self.cops[op_name] = op
elif isinstance(op, _pipeline_volume.PipelineVolume):
self.vols[op_name] = op
elif isinstance(op, _pipeline_vsnapshot.PipelineVolumeSnapshot):
self.snaps[op_name] = op
if not define_only:
self.groups[-1].ops.append(op)

Expand Down
41 changes: 40 additions & 1 deletion sdk/python/kfp/dsl/_pipeline_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

from . import _pipeline
from . import _pipeline_param
from kubernetes import client as k8s_client
from . import _pipeline_vsnapshot
from ..compiler._k8s_helper import K8sHelper
from kubernetes import client as k8s_client
import re
from typing import List, Dict

Expand Down Expand Up @@ -151,6 +152,36 @@ def __init__(self, name: str = None, pvc: str = None,
self.data_source = data_source
self.data_source_name = data_source
self.from_snapshot = True
if isinstance(data_source,
_pipeline_vsnapshot.PipelineVolumeSnapshot):
if self.data_source.new_snap:
self.deps.add(data_source.name)
self.inputs.append(
_pipeline_param.PipelineParam(
name="name",
op_name=self.data_source.name
)
)
self.inputs.append(
_pipeline_param.PipelineParam(
name="size",
op_name=self.data_source.name
)
)
self.data_source_name = data_source.name
data_source = "{{inputs.parameters.%s-name}}" % \
data_source.name
if size is None:
size_param = True
size = "{{inputs.parameters.%s-size}}" % \
self.data_source.name
else:
if data_source.snapshot is None:
self.data_source_name = data_source.name
data_source = data_source.name
else:
self.data_source_name = data_source.snapshot
data_source = data_source.snapshot
elif size:
self.from_scratch = True

Expand Down Expand Up @@ -316,6 +347,14 @@ def implies(newdep, olddep):

return ret

def snapshot(self, name: str = None, snapshot_class: str = None):
"""Create a snapshot from this PipelineVolume"""
return _pipeline_vsnapshot.PipelineVolumeSnapshot(
pipeline_volume=self,
name=name,
snapshot_class=snapshot_class
)

def _validate_memory_string(self, memory_string):
"""Validate a given string is valid for memory request or limit."""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
Expand Down
Loading

0 comments on commit f787b13

Please sign in to comment.