Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Kubernetes Jobs #1906

Merged
merged 26 commits into from
Feb 24, 2017
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7619a9c
add k8s support
Oct 29, 2016
b0f5a4a
task definition simplified
Oct 31, 2016
f08b9f7
remove sys import
Oct 31, 2016
a2685a2
Add python 2 warning
Oct 31, 2016
bc94870
add tests and comment
Nov 1, 2016
82ca272
add tests and fix bugs
Nov 1, 2016
da9ec93
add support for ServiceAccount
Nov 22, 2016
0c553e8
print auth method as debug message
Nov 23, 2016
078bc58
fix logging
Nov 23, 2016
6cb5317
add output req, update test+example, get logger
henryrizzi Jan 4, 2017
acc97d9
make output more flexible, change spelling+consistency, add config in…
henryrizzi Jan 5, 2017
37f7733
add support for custom job labels
henryrizzi Jan 5, 2017
a042c93
use mock for testing signal_complete behavior + reduce job requirements
henryrizzi Jan 18, 2017
adef08e
Merge pull request #1 from henryrizzi/feature/k8s-task-changes
mcapuccini Jan 19, 2017
8b7f615
name changes and config addition due to feedback
henryrizzi Jan 25, 2017
eda6421
Merge pull request #2 from henryrizzi/feature/k8s-task-changes
mcapuccini Jan 25, 2017
d920cdf
Update restart policy in job_json to make it effective when spec_sche…
mcapuccini-ci Feb 18, 2017
24c25ac
Merge pull request #4 from mcapuccini/fix/restart-policy
mcapuccini Feb 18, 2017
a2023ee
attempt to fix travis doc check
mcapuccini-ci Feb 19, 2017
e64e4f8
Revert "attempt to fix travis doc check"
mcapuccini Feb 19, 2017
e06adb0
skip tests when pykube is not available
Feb 20, 2017
c9e4ff4
fix documentation
Feb 20, 2017
9de1222
fix bullet list
Feb 20, 2017
cf8fc8e
fix bullet list, attempt 2
Feb 20, 2017
2429850
fix output documentation
Feb 20, 2017
2c4c340
fix config documentation
Feb 20, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,29 @@ release
"cdh4".


[k8s]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just spell out kubernetes? Let's choose slightly wordy over unclear. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both k8s and Kubernetes are mutually intelligible to Kubernetes users, but I guess that would be better for continuity.

What kind of naming convention would you suggest for files though? Should the examples and code be called kubernetes_job.py and the test kubernetes_job_test.py, or does that not matter as much?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both k8s and Kubernetes are mutually intelligible to Kubernetes users, but I guess that would be better for continuity.

I want luigi users who don't know kubrenetes to find the docs to not have any surprises as well. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

-------

Parameters controlling Kubernetes Job Tasks

auth_method
Authorization method to access the cluster.
Options are "kubeconfig_" or "service-account_"

kubeconfig_path
Path to kubeconfig file, for cluster authentication.
It defaults to ``~/.kube/config``, which is the default location when
using minikube_.
When auth_method is "service-account" this property is ignored.

max_retrials
Maximum number of retrials in case of job failure.

.. _service-account: http://kubernetes.io/docs/user-guide/kubeconfig-file
.. _kubeconfig: http://kubernetes.io/docs/user-guide/service-accounts
.. _minikube: http://kubernetes.io/docs/getting-started-guides/minikube


[mysql]
-------

Expand Down
65 changes: 65 additions & 0 deletions examples/kubernetes_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
#
# Copyright 2015 Outlier Bio, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add module docs. SImilar to that of the execution summary example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the docstring below good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think this is very good. :)

"""
Example Kubernetes Job Task.

Requires:

- pykube: ``pip install pykube``
- A local minikube custer up and running: http://kubernetes.io/docs/getting-started-guides/minikube/

**WARNING**: For Python versions < 3.5 the kubeconfig file must point to a Kubernetes API
hostname, and NOT to an IP address.

You can run this code example like this:

.. code:: console
$ luigi --module examples.kubernetes_job PerlPi --local-scheduler

Running this code will create a pi-luigi-uuid kubernetes job within the cluster
pointed to by the default context in "~/.kube/config".

If running within a kubernetes cluster, set auth_method = "service-account" to
access the local cluster.
"""

# import os
# import luigi
from luigi.contrib.k8s_job import KubernetesJobTask


class PerlPi(KubernetesJobTask):

name = "pi"
max_retrials = 3
spec_schema = {
"containers": [{
"name": "pi",
"image": "perl",
"command": ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
}]
}

# defining the two functions below allows for dependency checking,
# but isn't a requirement
# def signal_complete(self):
# with self.output().open('w') as output:
# output.write('')
#
# def output(self):
# target = os.path.join("/tmp", "PerlPi")
# return luigi.LocalTarget(target)
231 changes: 231 additions & 0 deletions luigi/contrib/k8s_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# -*- coding: utf-8 -*-
#
# Copyright 2015 Outlier Bio, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


"""
Kubernetes Job wrapper for Luigi.

From the Kubernetes website:

Kubernetes is an open-source system for automating deployment, scaling,
and management of containerized applications.

For more information about Kubernetes Jobs: http://kubernetes.io/docs/user-guide/jobs/

Requires:

- pykube: ``pip install pykube``

Written and maintained by Marco Capuccini (@mcapuccini).
"""

import luigi
from luigi import configuration
import logging
import uuid
import time

logger = logging.getLogger('luigi-interface')

try:
from pykube.config import KubeConfig
from pykube.http import HTTPClient
from pykube.objects import Job
except ImportError:
logger.warning('pykube is not installed. KubernetesJobTask requires pykube.')


class KubernetesJobTask(luigi.Task):

__POLL_TIME = 5 # see __track_job

def _init_k8s(self):
self.__logger = logger
self.__logger.debug("Kubernetes auth method: " + self.auth_method)
if(self.auth_method == "kubeconfig"):
self.__kube_api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path))
elif(self.auth_method == "service-account"):
self.__kube_api = HTTPClient(KubeConfig.from_service_account())
else:
raise ValueError("Illegal auth_method")
self.job_uuid = str(uuid.uuid4().hex)
self.uu_name = self.name + "-luigi-" + self.job_uuid
if ("restartPolicy" not in self.spec_schema):
self.spec_schema["restartPolicy"] = "Never"

@property
def auth_method(self):
"""
This can be set to ``kubeconfig`` or ``service-account``.
It defaults to ``kubeconfig``.

For more details, please refer to:

- kubeconfig: http://kubernetes.io/docs/user-guide/kubeconfig-file
- service-account: http://kubernetes.io/docs/user-guide/service-accounts
"""
return configuration.get_config().get("k8s", "auth_method", "kubeconfig")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change all calls to configuration.get_config() to using luigi Config classes instead. There's some docs here: http://luigi.readthedocs.io/en/stable/api/luigi.configuration.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the top there's an import for from luigi import configuration. Would you prefer calling luigi.configuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No no, "Config classes" is a concept in luigi. Here's docs and an example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, a final thing, since this is a new class, there's no need for the config_path kwarg from the example.


@property
def kubeconfig_path(self):
"""
Path to kubeconfig file used for cluster authentication.
It defaults to "~/.kube/config", which is the default location
when using minikube (http://kubernetes.io/docs/getting-started-guides/minikube).
When auth_method is ``service-account`` this property is ignored.

**WARNING**: For Python versions < 3.5 kubeconfig must point to a Kubernetes API
hostname, and NOT to an IP address.

For more details, please refer to:
http://kubernetes.io/docs/user-guide/kubeconfig-file
"""
return configuration.get_config().get("k8s", "kubeconfig_path", "~/.kube/config")

@property
def name(self):
"""
A name for this job. This task will automatically append a UUID to the
name before to submit to Kubernetes.
"""
raise NotImplementedError("subclass must define name")

@property
def labels(self):
"""
Return custom labels for kubernetes job.
example::
``{"run_dt": datetime.date.today().strftime('%F')}``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. This shouldn't be indented according to the doc compiler.

"""
return {}

@property
def spec_schema(self):
"""
Kubernetes Job spec schema in JSON format, example::
.. code-block:: javascript

{
"containers": [{
"name": "pi",
"image": "perl",
"command": ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
}],
"restartPolicy": "Never"
}

**restartPolicy**

- If restartPolicy is not defined, it will be set to "Never" by default.

- **Warning**: restartPolicy=OnFailure will bypass max_retrials, and restart
the container until success, with the risk of blocking the Luigi task.

For more informations please refer to:
http://kubernetes.io/docs/user-guide/pods/multi-container/#the-spec-schema
"""
raise NotImplementedError("subclass must define spec_schema")

@property
def max_retrials(self):
"""
Maximum number of retrials in case of failure.
"""
return configuration.get_config().get("k8s", "max_retrials", 0)

def __track_job(self):
"""Poll job status while active"""
while (self.__get_job_status() == "running"):
self.__logger.debug("Kubernetes job " + self.uu_name
+ " is still running")
time.sleep(self.__POLL_TIME)
if(self.__get_job_status() == "succeeded"):
self.__logger.info("Kubernetes job " + self.uu_name + " succeeded")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to touch some sort of output at this point to signal job completion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something along the lines of:

with self.output().open('w') as output_file:
    output_file.write('')

Just to touch the required output file.

# Use signal_complete to notify of job completion
self.signal_complete()
else:
raise RuntimeError("Kubernetes job " + self.uu_name + " failed")

def signal_complete(self):
"""Signal job completion for scheduler and dependent tasks.

Touching a system file is an easy way to signal completion. example::
.. code-block:: python

with self.output().open('w') as output_file:
output_file.write('')
"""
pass

def __get_job_status(self):
"""Return the Kubernetes job status"""
# Look for the required job
jobs = Job.objects(self.__kube_api).filter(selector="luigi_task_id="
+ self.job_uuid)
# Raise an exception if no such job found
if len(jobs.response["items"]) == 0:
raise RuntimeError("Kubernetes job " + self.uu_name + " not found")
# Figure out status and return it
job = Job(self.__kube_api, jobs.response["items"][0])
if ("succeeded" in job.obj["status"] and job.obj["status"]["succeeded"] > 0):
job.scale(replicas=0) # Downscale the job, but keep it for logging
return "succeeded"
if ("failed" in job.obj["status"]):
failed_cnt = job.obj["status"]["failed"]
self.__logger.debug("Kubernetes job " + self.uu_name
+ " status.failed: " + str(failed_cnt))
if (failed_cnt > self.max_retrials):
job.scale(replicas=0) # avoid more retrials
return "failed"
return "running"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding this to the method would be really helpful, and fixes an error that I had while testing (there's no way to tell that the k8s job is complete)

def output(self):
        """Implement an output to allow for dependency chaining"""
        raise NotImplementedError("Subclass must define output")

def run(self):
self._init_k8s()
# Submit the Job
self.__logger.info("Submitting Kubernetes Job: " + self.uu_name)
job_json = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": self.uu_name,
"labels": {
"spawned_by": "luigi",
"luigi_task_id": self.job_uuid
}
},
"spec": {
"template": {
"metadata": {
"name": self.uu_name
},
"spec": self.spec_schema
}
}
}
job_json['metadata']['labels'].update(self.labels)
job = Job(self.__kube_api, job_json)
job.create()
# Track the Job (wait while active)
self.__logger.info("Start tracking Kubernetes Job: " + self.uu_name)
self.__track_job()

def output(self):
"""An output target is necessary for checking job completion unless
alternative complete method is defined.
example::
``return luigi.LocalTarget(os.path.join('/tmp', 'example'))``"""
pass
Loading