Skip to content

Commit

Permalink
[@kubernetes port] Allow configurable port number (#1793)
Browse files Browse the repository at this point in the history
* [@kubernetes port] Allow configurable port number

Testing Done:

- Verified that pods with @kubernetes(port=N) start with port N configured
  correctly in the pod spec.
- Verified that pods without port=N work as expected
- Verified that @kubernetes(port=N) works as expected with Argo workflows

* [@kubernetes port] Incorporate review comments
  • Loading branch information
valayDave authored Apr 8, 2024
1 parent 8f576e6 commit 83e97a5
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 1 deletion.
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@
KUBERNETES_FETCH_EC2_METADATA = from_conf("KUBERNETES_FETCH_EC2_METADATA", False)
# Shared memory in MB to use for this step
KUBERNETES_SHARED_MEMORY = from_conf("KUBERNETES_SHARED_MEMORY", None)
# Default port number to open on the pods
KUBERNETES_PORT = from_conf("KUBERNETES_PORT", None)

ARGO_WORKFLOWS_KUBERNETES_SECRETS = from_conf("ARGO_WORKFLOWS_KUBERNETES_SECRETS", "")
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP = from_conf("ARGO_WORKFLOWS_ENV_VARS_TO_SKIP", "")
Expand Down
7 changes: 6 additions & 1 deletion metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,12 +1371,14 @@ def _container_templates(self):
# Set shared_memory to 0 if it isn't specified. This results
# in Kubernetes using it's default value when the pod is created.
shared_memory = resources.get("shared_memory", 0)
port = resources.get("port", None)
if port:
port = int(port)

tmpfs_enabled = use_tmpfs or (tmpfs_size and not use_tmpfs)

if tmpfs_enabled and tmpfs_tempdir:
env["METAFLOW_TEMPDIR"] = tmpfs_path

# Create a ContainerTemplate for this node. Ideally, we would have
# liked to inline this ContainerTemplate and avoid scanning the workflow
# twice, but due to issues with variable substitution, we will have to
Expand Down Expand Up @@ -1435,6 +1437,9 @@ def _container_templates(self):
kubernetes_sdk.V1Container(
name=self._sanitize(node.name),
command=cmds,
ports=[kubernetes_sdk.V1ContainerPort(container_port=port)]
if port
else None,
env=[
kubernetes_sdk.V1EnvVar(name=k, value=str(v))
for k, v in env.items()
Expand Down
2 changes: 2 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def create_job(
tolerations=None,
labels=None,
shared_memory=None,
port=None,
):
if env is None:
env = {}
Expand Down Expand Up @@ -215,6 +216,7 @@ def create_job(
tmpfs_path=tmpfs_path,
persistent_volume_claims=persistent_volume_claims,
shared_memory=shared_memory,
port=port,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def kubernetes():
multiple=False,
)
@click.option("--shared-memory", default=None, help="Size of shared memory in MiB")
@click.option("--port", default=None, help="Port number to expose from the container")
@click.pass_context
def step(
ctx,
Expand All @@ -134,6 +135,7 @@ def step(
persistent_volume_claims=None,
tolerations=None,
shared_memory=None,
port=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None, **kwargs):
Expand Down Expand Up @@ -248,6 +250,7 @@ def _sync_metadata():
persistent_volume_claims=persistent_volume_claims,
tolerations=tolerations,
shared_memory=shared_memory,
port=port,
)
except Exception as e:
traceback.print_exc(chain=False)
Expand Down
6 changes: 6 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
KUBERNETES_TOLERATIONS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SHARED_MEMORY,
KUBERNETES_PORT,
)
from metaflow.plugins.resources_decorator import ResourcesDecorator
from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task
Expand Down Expand Up @@ -90,6 +91,8 @@ class KubernetesDecorator(StepDecorator):
volumes to the path to which the volume is to be mounted, e.g., `{'pvc-name': '/path/to/mount/on'}`.
shared_memory: int, optional
Shared memory size (in MiB) required for this step
port: int, optional
Port number to specify in the Kubernetes job object
"""

name = "kubernetes"
Expand All @@ -113,6 +116,7 @@ class KubernetesDecorator(StepDecorator):
"tmpfs_path": "/metaflow_temp",
"persistent_volume_claims": None, # e.g., {"pvc-name": "/mnt/vol", "another-pvc": "/mnt/vol2"}
"shared_memory": None,
"port": None,
}
package_url = None
package_sha = None
Expand Down Expand Up @@ -200,6 +204,8 @@ def __init__(self, attributes=None, statically_defined=False):
self.attributes["tmpfs_size"] = int(self.attributes["memory"]) // 2
if not self.attributes["shared_memory"]:
self.attributes["shared_memory"] = KUBERNETES_SHARED_MEMORY
if not self.attributes["port"]:
self.attributes["port"] = KUBERNETES_PORT

# Refer https://github.com/Netflix/metaflow/blob/master/docs/lifecycle.png
def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
Expand Down
7 changes: 7 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ def create(self):
containers=[
client.V1Container(
command=self._kwargs["command"],
ports=[
client.V1ContainerPort(
container_port=int(self._kwargs["port"])
)
]
if "port" in self._kwargs and self._kwargs["port"]
else None,
env=[
client.V1EnvVar(name=k, value=str(v))
for k, v in self._kwargs.get(
Expand Down

0 comments on commit 83e97a5

Please sign in to comment.