Skip to content

Commit

Permalink
Merge pull request #114 from getindata/release-0.6.2
Browse files Browse the repository at this point in the history
Release 0.6.2
  • Loading branch information
szczeles authored Mar 10, 2022
2 parents 9130b92 + dd7e6f1 commit be5f0b1
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 30 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.6.2] - 2022-03-10

- Added support for defining retry policy for the Kubeflow Pipelines nodes

## [0.6.1] - 2022-03-07

- Fixed support for parameters of type `datetime.date`
Expand Down Expand Up @@ -123,7 +127,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.6.1...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.6.2...HEAD

[0.6.2]: https://github.com/getindata/kedro-kubeflow/compare/0.6.1...0.6.2

[0.6.1]: https://github.com/getindata/kedro-kubeflow/compare/0.6.0...0.6.1

Expand Down
56 changes: 51 additions & 5 deletions docs/source/02_installation/02_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ run_config:
# Pull pilicy to be used for the steps. Use Always if you push the images
# on the same tag, or Never if you use only local images
image_pull_policy: IfNotPresent

# Location of Vertex AI GCS root, required only for vertex ai pipelines configuration
root: bucket_name/gcs_suffix

# Name of the kubeflow experiment to be created
experiment_name: Kubeflow Plugin Demo
experiment_name: Kubeflow Plugin Demo [${branch_name|local}]

# Name of the run for run-once, templated with the run-once parameters
run_name: Kubeflow Plugin Demo Run ${pipeline_name} ${branch_name|local} ${commit_id|local}

# Name of the run for run-once
run_name: Kubeflow Plugin Demo Run
# Name of the scheduled run, templated with the schedule parameters
scheduled_run_name: Kubeflow Plugin Demo Recurring Run ${pipeline_name}

# Optional pipeline description
description: Very Important Pipeline
Expand All @@ -35,7 +38,35 @@ run_config:
# volume after pipeline finishes) [in seconds]. Default: 1 week
ttl: 604800

# Optional volume specification
# What Kedro pipeline should be run as the last step regardless of the
# pipeline status. Used to send notifications or raise the alerts
# on_exit_pipeline: notify_via_slack

# This sets the caching option for pipeline using
# execution_options.caching_strategy.max_cache_staleness
# See https://en.wikipedia.org/wiki/ISO_8601 in section 'Duration'
#max_cache_staleness: P0D

# Set to false to disable kfp artifacts exposal
# This setting can be useful if you don't want to store
# intermediate results in the MLMD
#store_kedro_outputs_as_kfp_artifacts: True

# Strategy used to generate Kubeflow pipeline nodes from Kedro nodes
# Available strategies:
# * none (default) - nodes in Kedro pipeline are mapped to separate nodes
# in Kubeflow pipelines. This strategy allows to inspect
# a whole processing graph in Kubeflow UI and override
# resources for each node (because they are run in separate pods)
# Although, performance may not be optimal due to potential
# sharing of intermediate datasets through disk.
# * full - nodes in Kedro pipeline are mapped to one node in Kubeflow pipelines.
# This strategy mitigate potential performance issues with `none` strategy
# but at the cost of degraded user experience within Kubeflow UI: a graph
# is collapsed to one node.
#node_merge_strategy: none

# Optional volume specification (only for non vertex-ai)
volume:

# Storage class - use null (or no value) to use the default storage
Expand Down Expand Up @@ -85,6 +116,21 @@ run_config:
__default__:
cpu: 200m
memory: 64Mi

# Optional section to provide retry policy for the steps
# and default policy for steps with no policy specified
retry_policy:
# 90 retries every 5 minutes
wait_for_partition_availability:
num_retries: 90
backoff_duration: 5m
backoff_factor: 1

# 4 retries after: 1 minute, 2 minutes, 4 minutes, 8 minutes
__default__:
num_retries: 4
backoff_duration: 60s
backoff_factor: 2
```
## Dynamic configuration support
Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.6.1"
version = "0.6.2"
43 changes: 43 additions & 0 deletions kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@
__default__:
cpu: 200m
memory: 64Mi
# Optional section to provide retry policy for the steps
# and default policy for steps with no policy specified
retry_policy:
# 90 retries every 5 minutes
wait_for_partition_availability:
num_retries: 90
backoff_duration: 5m
backoff_factor: 1
# 4 retries after: 1 minute, 2 minutes, 4 minutes, 8 minutes
__default__:
num_retries: 4
backoff_duration: 60s
backoff_factor: 2
"""


Expand Down Expand Up @@ -191,6 +206,30 @@ def get_for(self, node_name):
return {**defaults, **node_specific}


class RetryPolicy(Config):
def is_set_for(self, node_name):
return self.get_for(node_name) != {}

def get_for(self, node_name):
defaults = self._get_or_default("__default__", {})
node_specific = self._get_or_default(node_name, {})
values = {**defaults, **node_specific}
if values == {}:
return {}
values["num_retries"] = int(values.get("num_retries", 0))
values["backoff_factor"] = (
float(values["backoff_factor"])
if "backoff_factor" in values
else None
)
values["backoff_duration"] = (
str(values["backoff_duration"])
if "backoff_duration" in values
else None
)
return values


class RunConfig(Config):
@property
def image(self):
Expand Down Expand Up @@ -226,6 +265,10 @@ def description(self):
def resources(self):
return NodeResources(self._get_or_default("resources", {}))

@property
def retry_policy(self):
return RetryPolicy(self._get_or_default("retry_policy", {}))

@property
def volume(self):
if "volume" in self._raw.keys():
Expand Down
15 changes: 3 additions & 12 deletions kedro_kubeflow/generators/one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

import kubernetes.client as k8s
from kfp import dsl

from ..utils import clean_name
Expand All @@ -9,6 +8,7 @@
create_command_using_params_dumper,
create_container_environment,
create_pipeline_exit_handler,
customize_op,
maybe_add_params,
)

Expand Down Expand Up @@ -47,15 +47,6 @@ def _build_kfp_op(
image,
image_pull_policy,
) -> dsl.ContainerOp:
kwargs = {
"env": create_container_environment(),
"image_pull_policy": image_pull_policy,
}
default_resources = self.run_config.resources.get_for("__default__")
if default_resources:
kwargs["resources"] = k8s.V1ResourceRequirements(
limits=default_resources, requests=default_resources
)
container_op = dsl.ContainerOp(
name=clean_name(pipeline),
image=image,
Expand All @@ -69,7 +60,7 @@ def _build_kfp_op(
arguments=create_arguments_from_parameters(
self.context.params.keys()
),
container_kwargs=kwargs,
container_kwargs={"env": create_container_environment()},
file_outputs={
output: f"/home/kedro/{self.catalog[output]['filepath']}"
for output in self.catalog
Expand All @@ -82,4 +73,4 @@ def _build_kfp_op(
self.run_config.max_cache_staleness
)

return container_op
return customize_op(container_op, image_pull_policy, self.run_config)
9 changes: 1 addition & 8 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,6 @@ def _build_kfp_ops(

for node in node_dependencies:
name = clean_name(node.name)
kwargs = {"env": nodes_env}
if self.run_config.resources.is_set_for(node.name):
kwargs["resources"] = k8s.V1ResourceRequirements(
limits=self.run_config.resources.get_for(node.name),
requests=self.run_config.resources.get_for(node.name),
)

kfp_ops[node.name] = customize_op(
dsl.ContainerOp(
name=name,
Expand All @@ -137,7 +130,7 @@ def _build_kfp_ops(
self.context.params.keys()
),
pvolumes=node_volumes,
container_kwargs=kwargs,
container_kwargs={"env": nodes_env},
file_outputs={
output: "/home/kedro/"
+ self.catalog[output]["filepath"]
Expand Down
10 changes: 10 additions & 0 deletions kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,14 @@ def customize_op(op, image_pull_policy, run_config):
op.container.set_security_context(
k8s.V1SecurityContext(run_as_user=run_config.volume.owner)
)

if run_config.resources.is_set_for(op.name):
op.container.resources = k8s.V1ResourceRequirements(
limits=run_config.resources.get_for(op.name),
requests=run_config.resources.get_for(op.name),
)
if run_config.retry_policy.is_set_for(op.name):
op.set_retry(
policy="Always", **run_config.retry_policy.get_for(op.name)
)
return op
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.6.1
current_version = 0.6.2

[bumpversion:file:setup.py]

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"docs": [
"sphinx==3.4.2",
"recommonmark==0.7.1",
"sphinx_rtd_theme==0.6.1",
"sphinx_rtd_theme==0.6.2",
],
"vertexai": [
"google-cloud-scheduler>=2.3.2",
Expand All @@ -37,7 +37,7 @@

setup(
name="kedro-kubeflow",
version="0.6.1",
version="0.6.2",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
32 changes: 32 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,35 @@ def test_reuse_run_name_for_scheduled_run_name(self):
cfg = PluginConfig({"run_config": {"run_name": "some run"}})
assert cfg.run_config.run_name == "some run"
assert cfg.run_config.scheduled_run_name == "some run"

def test_retry_policy_default_and_node_specific(self):
cfg = PluginConfig(
{
"run_config": {
"retry_policy": {
"__default__": {
"num_retries": 4,
"backoff_duration": "60s",
"backoff_factor": 2,
},
"node3": {
"num_retries": "100",
"backoff_duration": "5m",
"backoff_factor": 1,
},
}
}
}
)
assert cfg.run_config.retry_policy.is_set_for("node2")
assert cfg.run_config.retry_policy.get_for("node2") == {
"backoff_duration": "60s",
"backoff_factor": 2,
"num_retries": 4,
}
assert cfg.run_config.retry_policy.is_set_for("node3")
assert cfg.run_config.retry_policy.get_for("node3") == {
"backoff_duration": "5m",
"backoff_factor": 1,
"num_retries": 100,
}
51 changes: 51 additions & 0 deletions tests/test_one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,57 @@ def test_should_add_resources_spec(self):
assert resources.limits == {"cpu": "100m", "memory": "8Gi"}
assert resources.requests == {"cpu": "100m", "memory": "8Gi"}

def test_should_not_add_retry_policy_if_not_requested(self):
# given
self.create_generator(config={})

# when
with kfp.dsl.Pipeline(None) as dsl_pipeline:
self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "Always"
)()

# then
op = dsl_pipeline.ops["pipeline"]
assert op.num_retries == 0
assert op.retry_policy is None
assert op.backoff_factor is None
assert op.backoff_duration is None
assert op.backoff_max_duration is None

def test_should_add_retry_policy(self):
# given
self.create_generator(
config={
"retry_policy": {
"__default__": {
"num_retries": 4,
"backoff_duration": "60s",
"backoff_factor": 2,
},
"node1": {
"num_retries": 100,
"backoff_duration": "5m",
"backoff_factor": 1,
},
}
}
)

# when
with kfp.dsl.Pipeline(None) as dsl_pipeline:
self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "Always"
)()

# then
op = dsl_pipeline.ops["pipeline"]
assert op.num_retries == 4
assert op.retry_policy == "Always"
assert op.backoff_factor == 2
assert op.backoff_duration == "60s"
assert op.backoff_max_duration is None

def test_should_set_description(self):
# given
self.create_generator(config={"description": "DESC"})
Expand Down
Loading

0 comments on commit be5f0b1

Please sign in to comment.