From 6ce970846c0ca8c2850bc168b686340e15982b2b Mon Sep 17 00:00:00 2001 From: Marco Capuccini Date: Fri, 24 Feb 2017 02:56:07 +0100 Subject: [PATCH] Support for Kubernetes Jobs (#1906) --- doc/configuration.rst | 23 +++ examples/kubernetes.py | 65 +++++++++ luigi/contrib/kubernetes.py | 251 ++++++++++++++++++++++++++++++++ test/contrib/kubernetes_test.py | 103 +++++++++++++ 4 files changed, 442 insertions(+) create mode 100644 examples/kubernetes.py create mode 100644 luigi/contrib/kubernetes.py create mode 100644 test/contrib/kubernetes_test.py diff --git a/doc/configuration.rst b/doc/configuration.rst index e94c14588b..4e44d9b6f1 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -477,6 +477,29 @@ release "cdh4". +[kubernetes] +------------ + +Parameters controlling Kubernetes Job Tasks + +auth_method + Authorization method to access the cluster. + Options are "kubeconfig_" or "service-account_" + +kubeconfig_path + Path to kubeconfig file, for cluster authentication. + It defaults to ``~/.kube/config``, which is the default location when + using minikube_. + When auth_method is "service-account" this property is ignored. + +max_retrials + Maximum number of retrials in case of job failure. + +.. _service-account: http://kubernetes.io/docs/user-guide/kubeconfig-file +.. _kubeconfig: http://kubernetes.io/docs/user-guide/service-accounts +.. _minikube: http://kubernetes.io/docs/getting-started-guides/minikube + + [mysql] ------- diff --git a/examples/kubernetes.py b/examples/kubernetes.py new file mode 100644 index 0000000000..eb4ddfd185 --- /dev/null +++ b/examples/kubernetes.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2015 Outlier Bio, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Example Kubernetes Job Task. + +Requires: + +- pykube: ``pip install pykube`` +- A local minikube custer up and running: http://kubernetes.io/docs/getting-started-guides/minikube/ + +**WARNING**: For Python versions < 3.5 the kubeconfig file must point to a Kubernetes API +hostname, and NOT to an IP address. + +You can run this code example like this: + + .. code:: console + $ luigi --module examples.kubernetes_job PerlPi --local-scheduler + +Running this code will create a pi-luigi-uuid kubernetes job within the cluster +pointed to by the default context in "~/.kube/config". + +If running within a kubernetes cluster, set auth_method = "service-account" to +access the local cluster. +""" + +# import os +# import luigi +from luigi.contrib.kubernetes import KubernetesJobTask + + +class PerlPi(KubernetesJobTask): + + name = "pi" + max_retrials = 3 + spec_schema = { + "containers": [{ + "name": "pi", + "image": "perl", + "command": ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] + }] + } + + # defining the two functions below allows for dependency checking, + # but isn't a requirement + # def signal_complete(self): + # with self.output().open('w') as output: + # output.write('') + # + # def output(self): + # target = os.path.join("/tmp", "PerlPi") + # return luigi.LocalTarget(target) diff --git a/luigi/contrib/kubernetes.py b/luigi/contrib/kubernetes.py new file mode 100644 index 0000000000..0714bbd21b --- /dev/null +++ b/luigi/contrib/kubernetes.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2015 Outlier Bio, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +""" +Kubernetes Job wrapper for Luigi. + +From the Kubernetes website: + + Kubernetes is an open-source system for automating deployment, scaling, + and management of containerized applications. + +For more information about Kubernetes Jobs: http://kubernetes.io/docs/user-guide/jobs/ + +Requires: + +- pykube: ``pip install pykube`` + +Written and maintained by Marco Capuccini (@mcapuccini). +""" + +import luigi +import logging +import uuid +import time + +logger = logging.getLogger('luigi-interface') + +try: + from pykube.config import KubeConfig + from pykube.http import HTTPClient + from pykube.objects import Job +except ImportError: + logger.warning('pykube is not installed. KubernetesJobTask requires pykube.') + + +class kubernetes(luigi.Config): + auth_method = luigi.Parameter( + default="kubeconfig", + description="Authorization method to access the cluster") + kubeconfig_path = luigi.Parameter( + default="~/.kube/config", + description="Path to kubeconfig file for cluster authentication") + max_retrials = luigi.IntParameter( + default=0, + description="Max retrials in event of job failure") + + +class KubernetesJobTask(luigi.Task): + + __POLL_TIME = 5 # see __track_job + kubernetes_config = kubernetes() + + def _init_kubernetes(self): + self.__logger = logger + self.__logger.debug("Kubernetes auth method: " + self.auth_method) + if(self.auth_method == "kubeconfig"): + self.__kube_api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path)) + elif(self.auth_method == "service-account"): + self.__kube_api = HTTPClient(KubeConfig.from_service_account()) + else: + raise ValueError("Illegal auth_method") + self.job_uuid = str(uuid.uuid4().hex) + self.uu_name = self.name + "-luigi-" + self.job_uuid + + @property + def auth_method(self): + """ + This can be set to ``kubeconfig`` or ``service-account``. + It defaults to ``kubeconfig``. + + For more details, please refer to: + + - kubeconfig: http://kubernetes.io/docs/user-guide/kubeconfig-file + - service-account: http://kubernetes.io/docs/user-guide/service-accounts + """ + return self.kubernetes_config.auth_method + + @property + def kubeconfig_path(self): + """ + Path to kubeconfig file used for cluster authentication. + It defaults to "~/.kube/config", which is the default location + when using minikube (http://kubernetes.io/docs/getting-started-guides/minikube). + When auth_method is ``service-account`` this property is ignored. + + **WARNING**: For Python versions < 3.5 kubeconfig must point to a Kubernetes API + hostname, and NOT to an IP address. + + For more details, please refer to: + http://kubernetes.io/docs/user-guide/kubeconfig-file + """ + return self.kubernetes_config.kubeconfig_path + + @property + def name(self): + """ + A name for this job. This task will automatically append a UUID to the + name before to submit to Kubernetes. + """ + raise NotImplementedError("subclass must define name") + + @property + def labels(self): + """ + Return custom labels for kubernetes job. + example:: + ``{"run_dt": datetime.date.today().strftime('%F')}`` + """ + return {} + + @property + def spec_schema(self): + """ + Kubernetes Job spec schema in JSON format, an example follows. + + .. code-block:: javascript + + { + "containers": [{ + "name": "pi", + "image": "perl", + "command": ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] + }], + "restartPolicy": "Never" + } + + **restartPolicy** + + - If restartPolicy is not defined, it will be set to "Never" by default. + - **Warning**: restartPolicy=OnFailure will bypass max_retrials, and restart + the container until success, with the risk of blocking the Luigi task. + + For more informations please refer to: + http://kubernetes.io/docs/user-guide/pods/multi-container/#the-spec-schema + """ + raise NotImplementedError("subclass must define spec_schema") + + @property + def max_retrials(self): + """ + Maximum number of retrials in case of failure. + """ + return self.kubernetes_config.max_retrials + + def __track_job(self): + """Poll job status while active""" + while (self.__get_job_status() == "running"): + self.__logger.debug("Kubernetes job " + self.uu_name + + " is still running") + time.sleep(self.__POLL_TIME) + if(self.__get_job_status() == "succeeded"): + self.__logger.info("Kubernetes job " + self.uu_name + " succeeded") + # Use signal_complete to notify of job completion + self.signal_complete() + else: + raise RuntimeError("Kubernetes job " + self.uu_name + " failed") + + def signal_complete(self): + """Signal job completion for scheduler and dependent tasks. + + Touching a system file is an easy way to signal completion. example:: + .. code-block:: python + + with self.output().open('w') as output_file: + output_file.write('') + """ + pass + + def __get_job_status(self): + """Return the Kubernetes job status""" + # Look for the required job + jobs = Job.objects(self.__kube_api).filter(selector="luigi_task_id=" + + self.job_uuid) + # Raise an exception if no such job found + if len(jobs.response["items"]) == 0: + raise RuntimeError("Kubernetes job " + self.uu_name + " not found") + # Figure out status and return it + job = Job(self.__kube_api, jobs.response["items"][0]) + if ("succeeded" in job.obj["status"] and job.obj["status"]["succeeded"] > 0): + job.scale(replicas=0) # Downscale the job, but keep it for logging + return "succeeded" + if ("failed" in job.obj["status"]): + failed_cnt = job.obj["status"]["failed"] + self.__logger.debug("Kubernetes job " + self.uu_name + + " status.failed: " + str(failed_cnt)) + if (failed_cnt > self.max_retrials): + job.scale(replicas=0) # avoid more retrials + return "failed" + return "running" + + def run(self): + self._init_kubernetes() + # Render job + job_json = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": self.uu_name, + "labels": { + "spawned_by": "luigi", + "luigi_task_id": self.job_uuid + } + }, + "spec": { + "template": { + "metadata": { + "name": self.uu_name + }, + "spec": self.spec_schema + } + } + } + # Update user labels + job_json['metadata']['labels'].update(self.labels) + # Add default restartPolicy if not specified + if ("restartPolicy" not in self.spec_schema): + job_json["spec"]["template"]["spec"]["restartPolicy"] = "Never" + # Submit job + self.__logger.info("Submitting Kubernetes Job: " + self.uu_name) + job = Job(self.__kube_api, job_json) + job.create() + # Track the Job (wait while active) + self.__logger.info("Start tracking Kubernetes Job: " + self.uu_name) + self.__track_job() + + def output(self): + """ + An output target is necessary for checking job completion unless + an alternative complete method is defined. + + Example:: + + return luigi.LocalTarget(os.path.join('/tmp', 'example')) + + """ + pass diff --git a/test/contrib/kubernetes_test.py b/test/contrib/kubernetes_test.py new file mode 100644 index 0000000000..6098e3f2d0 --- /dev/null +++ b/test/contrib/kubernetes_test.py @@ -0,0 +1,103 @@ + +# -*- coding: utf-8 -*- +# +# Copyright 2015 Outlier Bio, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for the Kubernetes Job wrapper. + +Requires: + +- pykube: ``pip install pykube`` +- A local minikube custer up and running: http://kubernetes.io/docs/getting-started-guides/minikube/ + +**WARNING**: For Python versions < 3.5 the kubeconfig file must point to a Kubernetes API +hostname, and NOT to an IP address. + +Written and maintained by Marco Capuccini (@mcapuccini). +""" + +import unittest +import luigi +import logging +import mock +from luigi.contrib.kubernetes import KubernetesJobTask + +logger = logging.getLogger('luigi-interface') + +try: + from pykube.config import KubeConfig + from pykube.http import HTTPClient + from pykube.objects import Job +except ImportError: + raise unittest.SkipTest('pykube is not installed. This test requires pykube.') + + +class SuccessJob(KubernetesJobTask): + name = "success" + spec_schema = { + "containers": [{ + "name": "hello", + "image": "alpine:3.4", + "command": ["echo", "Hello World!"] + }] + } + + +class FailJob(KubernetesJobTask): + name = "fail" + max_retrials = 3 + spec_schema = { + "containers": [{ + "name": "fail", + "image": "alpine:3.4", + "command": ["You", "Shall", "Not", "Pass"] + }] + } + + +class TestK8STask(unittest.TestCase): + + def test_success_job(self): + success = luigi.run(["SuccessJob", "--local-scheduler"]) + self.assertTrue(success) + + def test_fail_job(self): + fail = FailJob() + self.assertRaises(RuntimeError, fail.run) + # Check for retrials + kube_api = HTTPClient(KubeConfig.from_file("~/.kube/config")) # assumes minikube + jobs = Job.objects(kube_api).filter(selector="luigi_task_id=" + + fail.job_uuid) + self.assertEqual(len(jobs.response["items"]), 1) + job = Job(kube_api, jobs.response["items"][0]) + self.assertTrue("failed" in job.obj["status"]) + self.assertTrue(job.obj["status"]["failed"] > fail.max_retrials) + + @mock.patch.object(KubernetesJobTask, "_KubernetesJobTask__get_job_status") + @mock.patch.object(KubernetesJobTask, "signal_complete") + def test_output(self, mock_signal, mock_job_status): + # mock that the job succeeded + mock_job_status.return_value = "succeeded" + # create a kubernetes job + kubernetes_job = KubernetesJobTask() + # set logger and uu_name due to logging in __track_job() + kubernetes_job._KubernetesJobTask__logger = logger + kubernetes_job.uu_name = "test" + # track the job (bc included in run method) + kubernetes_job._KubernetesJobTask__track_job() + # Make sure successful job signals + self.assertTrue(mock_signal.called)