Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kubernetes tolerations #1207

Merged
merged 22 commits into from
Dec 21, 2022

Conversation

odracci
Copy link
Contributor

@odracci odracci commented Dec 5, 2022

This PR adds support for tolerations in the Kubernetes and Argo plugins.

Testing Done:

Using the script flow.py

DEFAULT_RESOURCES = {"cpu": "1", "gpu": "0", "memory": "1"}

from metaflow import (
    FlowSpec,
    step,
    timeout,
    kubernetes,
    resources,
)


class TestTolerationsFlow(FlowSpec):
    @resources(**DEFAULT_RESOURCES)
    @step
    def start(self):
        """
        This is the 'start' step. All flows must have a step named 'start' that
        is the first step in the flow.

        """
        self.next(self.end)

    @resources(**DEFAULT_RESOURCES)
    @step
    def end(self):
        """
        This is the 'end' step. All flows must have an 'end' step, which is the
        last step in the flow.

        """
        print("TestTolerationsFlow is all done.")


if __name__ == "__main__":
    TestTolerationsFlow()

Tested the following commands:

  • python3 flow.py run --with kubernetes
  • python3 flow.py run --with kubernetes:node_selector=app=cpu
  • python3 flow.py run --with kubernetes:node_selector=app=cpu,tolerations='[{"key":"app","value":"cpu","effect":"NoSchedule"}]'
  • METAFLOW_KUBERNETES_TOLERATIONS='[{"key":"app","value":"cpu","effect":"NoSchedule"}]' python3 flow.py run --with kubernetes
  • METAFLOW_KUBERNETES_TOLERATIONS='[{"key":"app","value":"cpu","effect":"NoSchedule"}]' METAFLOW_KUBERNETES_NODE_SELECTOR='app=cpu' python3 flow.py run --with kubernetes
  • METAFLOW_KUBERNETES_TOLERATIONS='[{"key":"app","value":"cpu","effect":"NoSchedule"}]' METAFLOW_KUBERNETES_NODE_SELECTOR='app=cpu' python3 flow.py argo-workflows create
  • python3 flow.py argo-workflows trigger

With the script flow_decorator.py

DEFAULT_RESOURCES = {"cpu": "1", "gpu": "0", "memory": "1"}
NODE_GROUP = "cpu"
KUBERNETES_NODE_SELECTOR = f"app={NODE_GROUP}"
KUBERNETES_TOLERATIONS = [{"key": "app", "effect": "NoSchedule", "value": NODE_GROUP}]

from metaflow import (
    FlowSpec,
    step,
    timeout,
    kubernetes,
    resources,
)


class TestTolerationsFlow(FlowSpec):
    @resources(**DEFAULT_RESOURCES)
    @kubernetes(node_selector=KUBERNETES_NODE_SELECTOR, tolerations=KUBERNETES_TOLERATIONS)
    @step
    def start(self):
        """
        This is the 'start' step. All flows must have a step named 'start' that
        is the first step in the flow.

        """
        self.next(self.end)

    @resources(**DEFAULT_RESOURCES)
    @kubernetes(node_selector=KUBERNETES_NODE_SELECTOR, tolerations=KUBERNETES_TOLERATIONS)
    @step
    def end(self):
        """
        This is the 'end' step. All flows must have an 'end' step, which is the
        last step in the flow.

        """
        print("TestTolerationsFlow is all done.")


if __name__ == "__main__":
    TestTolerationsFlow()
  • python3 flow_decorator.py run
  • python3 flow_decorator.py argo-workflows create
  • python3 flow_decorator.py argo-workflows trigger

In all the executions, verified that the pods have the expected Tolerations and nodeSelectors

@shrinandj
Copy link
Contributor

Looks great overall! Can you add some details about the testing you did with this change? It would make reviewing a little easier (see the Testing Done section in this PR as an example)

Copy link
Collaborator

@savingoyal savingoyal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments re: code organization. @shrinandj is doing a full review.

metaflow/plugins/argo/argo_workflows.py Show resolved Hide resolved
metaflow/plugins/argo/argo_workflows.py Outdated Show resolved Hide resolved
metaflow/plugins/argo/argo_workflows.py Show resolved Hide resolved
metaflow/plugins/kubernetes/kubernetes_job.py Outdated Show resolved Hide resolved
@nflx-mf-bot
Copy link
Collaborator

Testing[300] @ c21ede9

@nflx-mf-bot
Copy link
Collaborator

Testing[300] @ c21ede9 PASSED

@shrinandj
Copy link
Contributor

The PR itself looks good to me. Can you confirm that at least the following scenarios have been tested:

  • Basic test for tolerations. Create a flow --with kubernetes:tolerations=[{"key": "arch", "operator": "Equal", "value": "amd"}]. It should run successfully.
  • Basic test for node_selectors since that code was also touched. Create a flow --with kubernetes:tolerations=[{"key": "arch", "operator": "Equal", "value": "amd"}],node_selector="mynode=mylabel". It should run successfully.
  • Ensuring other options haven't regressed. Create a flow --with kubernetes:cpu=2,tolerations=[{"key": "arch", "operator": "Equal", "value": "amd"}]. It should run successfully.
  • Ensuring that default K8s backend works as expected with just... --with kubernetes

The above tests with argo-workflows.
And some basic smoke tests WITHOUT kubernetes or Argo.

@odracci
Copy link
Contributor Author

odracci commented Dec 7, 2022

@shrinandj Thanks for the review. I managed to move the input validation into the decorator. I did some quick tests, and it looks good. Could you please review my latest commits?

I will provide a test report based on your input in the following days.

@shrinandj
Copy link
Contributor

Could you please review my latest commits?

I will look into these latest commits by later tonight.

for toleration in self.attributes["tolerations"]:
invalid_keys = [k for k in toleration.keys() if k not in V1Toleration.attribute_map.keys()]
invalid_keys = [k for k in toleration.keys() if k not in attribute_map]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this have to change? As K8s changes, these attributes could change, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(As compared to a previous commit where V1Toleration.attribute_map.keys() was getting used)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it runs on Argo, the module kubernetes is unavailable.
I can make the validation optional, given that it is required only when the flow runs locally and KubernetesClient raises an exception if the module is not installed. WDYT?

if self.attributes["tolerations"]:
    try:
        from kubernetes.client import V1Toleration
        for toleration in self.attributes["tolerations"]:
            invalid_keys = [k for k in toleration.keys() if k not in V1Toleration.attribute_map.keys()]
            if len(invalid_keys) > 0:
                raise KubernetesException(
                    "Tolerations parameter contains invalid keys: %s" % invalid_keys
                )
    except (NameError, ImportError):
        pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above looks like a happy compromise to me in which we try to do our best to validate AND keep up with upstream changes in K8s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shrinandj I pushed the new code

shrinandj
shrinandj previously approved these changes Dec 12, 2022
Copy link
Contributor

@shrinandj shrinandj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Great work!!

Copy link
Collaborator

@savingoyal savingoyal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just one minor issue needs to be addressed. We should be good to merge and release right after.

@@ -166,6 +174,9 @@ def echo(msg, stream="stderr", job_id=None):
stdout_location = ds.get_log_location(TASK_LOG_SOURCE, "stdout")
stderr_location = ds.get_log_location(TASK_LOG_SOURCE, "stderr")

# `node_selector` is a tuple of strings, convert it to a dictionary
node_selector = KubernetesDecorator.parse_node_selector(node_selector)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. At this stage, node_selector is a tuple of strings. kubernetes.launch_job expects a dictionary

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to handle this parsing within kubernetes_job - the actual format is dictated by the kubernetes SDK and that's why currently all the Kubernetes-related formatting is happening within the KubernetesJob object. As the SDK evolves, any changes would be isolated to that object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node_selector contains the value generated by

@click.option(
    "--node-selector",
    multiple=True,
    default=None,
    help="NodeSelector for Kubernetes pod.",
)

which is a tuple of strings

('key'='val','foo=bar')

kubernetes_job expect a dictionary like

{
  "key": "val",
  "foo": "bar",
}

parse_node_selector converts the tuple of strings to a dictionary compatible with the Kubernetes SDK.
Does it make sense?

# cased in kubernetes_client.py
if self.attributes["tolerations"]:
try:
from kubernetes.client import V1Toleration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the rationale for including this check in _init_ is to ensure that this check is invoked for argo-workflows too. However, this check will fail if the user hasn't installed the python package kubernetes yet - which is checked in package_init - that check should technically happen before the check for tolerations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is checked in kubernetes_cli.
The idea is that the check is invoked only if required, which means the python package kubernetes must be installed. I think the order of the execution doesn't matter.
If kubernetes is not available, self.attributes["tolerations"] is not being used, then the check is not required.
Does it make sense to you?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the user pip installs metaflow, we don't install Kubernetes python package. It's only when the user starts executing a flow that involves @kubernetes or argo - we throw a nice warning asking them to install the python package. Now, if that first flow has tolerations defined, then the user will instead get an error saying no module named Kubernetes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That import is inside a try block with

except (NameError, ImportError):
  pass

It should not raise any errors related to the missing module, is it correct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - but it's the round about way this check is implemented which is my concern. We can ship this and come back to clean it up.

@savingoyal
Copy link
Collaborator

also, you might want to appease black - https://github.com/Netflix/metaflow/actions/runs/3661106684/jobs/6259649285

@odracci
Copy link
Contributor Author

odracci commented Dec 14, 2022

@shrinandj @savingoyal I improved the error handling in a1711b6

@odracci
Copy link
Contributor Author

odracci commented Dec 14, 2022

The PR itself looks good to me. Can you confirm that at least the following scenarios have been tested:

  • Basic test for tolerations. Create a flow --with kubernetes:tolerations=[{"key": "arch", "operator": "Equal", "value": "amd"}]. It should run successfully.
  • Basic test for node_selectors since that code was also touched. Create a flow --with kubernetes:tolerations=[{"key": "arch", "operator": "Equal", "value": "amd"}],node_selector="mynode=mylabel". It should run successfully.
  • Ensuring other options haven't regressed. Create a flow --with kubernetes:cpu=2,tolerations=[{"key": "arch", "operator": "Equal", "value": "amd"}]. It should run successfully.
  • Ensuring that default K8s backend works as expected with just... --with kubernetes

The above tests with argo-workflows. And some basic smoke tests WITHOUT kubernetes or Argo.

@shrinandj Tests done, I've updated the description of this PR.

@shrinandj
Copy link
Contributor

I just realized that this PR would be a great reference for implementing some of the other features for K8s support (e.g. volume support).

@odracci
Copy link
Contributor Author

odracci commented Dec 19, 2022

@shrinandj @savingoyal can you please approve the GitHub workflow?

@savingoyal savingoyal merged commit 97a5ea5 into Netflix:master Dec 21, 2022
@odracci odracci deleted the support-for-kubernetes-tolerations branch December 29, 2022 21:45
@bbrandt
Copy link

bbrandt commented Feb 27, 2023

This seems to be the only documentation for Metaflow's Kubernetes tolerations support, so I'll add my note here.

To allow a Metaflow to run on an Azure Spot node pool in AKS, add this to your Metaflow config.json:

"METAFLOW_KUBERNETES_TOLERATIONS":"[{\"key\":\"kubernetes.azure.com/scalesetpriority\",\"value\":\"spot\",\"effect\":\"PreferNoSchedule\"},{\"key\":\"kubernetes.azure.com/scalesetpriority\",\"value\":\"spot\",\"effect\":\"NoSchedule\"}]"

This will allow flows to prefer running on a spot instance, but fallback to a more expensive node pool when your spot instance is not available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants