-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for Kubernetes Jobs #1906
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't look much at the actual code that matters. Can you find somebody with kubernetes knowledge to review?
Also, please write about if and how you've used this in production already. :)
examples/kubernetes_job.py
Outdated
} | ||
|
||
if __name__ == "__main__": | ||
luigi.run(['PerlPi', '--local-scheduler']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this as it's discouraged nowadays.
examples/kubernetes_job.py
Outdated
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add module docs. SImilar to that of the execution summary example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the docstring below good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think this is very good. :)
test/contrib/k8s_job_test.py
Outdated
self.assertTrue(job.obj["status"]["failed"] > fail.max_retrials) | ||
|
||
if __name__ == "__main__": | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove last 2 lines please. :)
Hi @Tarrasch, thanks for this first review. I talked to @pcm32. He doesn't have experience with Luigi, but he made a Kubernetes Job wrapper for Galaxy, so he could give a look to the code. However, he can't do this soonish. If you know someone with experience in both Kubernetes, and Lugi, it would work better I think. I didn't use this in production yet, but I will do it very soon. I will try to reproduce a scientific workflow in a cloud environment. I have a question about the CI. I think I can fix some of the checks that fail, but ultimately to run the tests that I wrote, you need a minikube cluster that is local to Travis (or some other Kubernetes cluster). Is that feasible in your settings? |
As for CI, you can annotate the tests (like we do for hdfs and many other systems). But I think you can skip making an actual Travis build for them. |
We're using Luigi and Kubernetes in Production at my shop. Will try to take a look as time permits |
thanks @tym-oao, I would really appreciate that. I will report if I succeed to use this in production too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. I previously mis-clicked approve ...
Hello, I'm at the same shop as @tym-oao . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to get this to work for a few trivial tests, but these changes would be appreciated. Also, I would think that there should be the requirement of output for the task to fit it more easily into a workflow. so including def output(self): raise NotImplementedError()
(unless that seems unreasonable)
luigi/contrib/k8s_job.py
Outdated
A name for this job. This task will automatically append a UUID to the | ||
name before to submit to Kubernetes. | ||
""" | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably also have this as a raise NotImplementedError("subclass must define name")
or you can keep both this and the one below as pass
.
luigi/contrib/k8s_job.py
Outdated
self.__logger.debug("Kubernetes job " + self.uu_name + " is still running") | ||
time.sleep(self.__POLL_TIME) | ||
if(self.__get_job_status() == "succeeded"): | ||
self.__logger.info("Kubernetes job " + self.uu_name + " succeeded") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make sense to touch some sort of output at this point to signal job completion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something along the lines of:
with self.output().open('w') as output_file:
output_file.write('')
Just to touch the required output file.
Thanks a lot for testing and review! I will wrap it up soon and come back to you guys. I am also working to a more complex analysis for a bioinformatics paper, I suggest that this PR gets merged after I complete that, so if I figure out that there is something missing I can add it. |
After looking into it more, it was a configuration issue on my part. Am getting some issues when trying to require a k8s task from another task, but I think that's expected without the task having an output. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes would be helpful for ascertaining job completion.
If this is added, I would also add an output to the tests, otherwise they will fail.
luigi/contrib/k8s_job.py
Outdated
job.scale(replicas=0) # avoid more retrials | ||
return "failed" | ||
return "running" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding this to the method would be really helpful, and fixes an error that I had while testing (there's no way to tell that the k8s job is complete)
def output(self):
"""Implement an output to allow for dependency chaining"""
raise NotImplementedError("Subclass must define output")
luigi/contrib/k8s_job.py
Outdated
self.__logger.debug("Kubernetes job " + self.uu_name + " is still running") | ||
time.sleep(self.__POLL_TIME) | ||
if(self.__get_job_status() == "succeeded"): | ||
self.__logger.info("Kubernetes job " + self.uu_name + " succeeded") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something along the lines of:
with self.output().open('w') as output_file:
output_file.write('')
Just to touch the required output file.
The central scheduler won't behave correctly when the task You could accomplish that a different way, but that will allow it to run with local scheduler or with the central scheduler. |
@mcapuccini Not sure if you have the time to work on this right now, so I addressed the changes that @Tarrasch and I requested here. No rush on this, just thought I'd do my best to help things along. 😄 |
@henryrizzi thanks for your review and comments, I really appreciate that. I added you as a collaborator to my fork, so you can add your improvements to this PR straight away. At the moment I have some other tasks to work to with higher priority, but in a couple of weeks I'll be able test this on a real use case. |
…fo docs, update tests and example
@mcapuccini Thanks for adding me as a collaborator to your fork and for making the PR! |
Make changes to the task to allow for central scheduling
I believe I replied to all your comments. The last change (which you nicely reminded me off) is to change the names. Just name them like:
Does it sound reasonable? Also the config class you'll create should also be called |
@henryrizzi will you take care of the latest change requests, or shall I do it? |
I can take care of the latest changes and put in another pull request to your branch. 👍 |
Name Changes + luigi.Config class addition due to feedback
This looks ready to merge except for that Travis is red. Once that's fixed I'm ok with this getting merged. :) |
@Tarrasch I am doing some tests on a real pipeline these days. There are some things to be fixed. Next week I'll be to a conference, so in a couple of weeks it will be ready to be merged. |
Waiting for @henryrizzi to review the latest changes before I get them merged. I have successfully run my workflow (https://github.com/phnmnl/jupyter-demo/blob/master/preprocessing_workflow.py) in a real k8s cluster. |
…ma is defined as a method, plus some comments refactory
Fix restart policy not picked
This reverts commit a2023ee.
@mcapuccini Thanks for this! I'm at the beginning of trying to run tasks on an Openshift cluster and this will help a ton. Just curious what your thoughts are from a design/implementation perspective: |
@colemanja91 yes, I run Luigi inside a container. What I like a lot is to run a custom Jupyter image where I can edit and run my Luigi workflows. This is not challenging at all, you just need to use the I am very soon going to integrate Luigi in KubeNow to enable data science pipelines on top of it. |
@Tarrasch I am quite confident that the build is going to pass this time. Then it should be ready to be merged IMO 🙂 |
Thanks! |
@mcapuccini great job here, thanks! Therr is a "WATCH" method in kubernetes api that returns the status as a stream and might be useful to reduce both the number of calls and the delay to get the response. |
@apierleoni I had tested it with 40 parallel Jobs with no problems. The watch method is interesting, if someone reports any problem in polling from the Kubernetes API, we should change the implementation. |
Thanks for the info
|
Support for Kubernetes Jobs
Description
I added a Task extension that enable to run Jobs in a Kubernetes cluster.
Motivation and Context
This enables the distribution of tasks, that come as light-weight application containers, in a Kubernetes cluster. There is a feature proposal: #1549.
Have you tested this? If so, how?
I have included unit tests. To run them locally you need a minikube cluster up and running.