Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: backwards compatible Kubernetes list and kill #2023

Merged
merged 6 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions metaflow/plugins/kubernetes/kube_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from metaflow.exception import CommandException
from metaflow.util import get_username, get_latest_run_id


def parse_cli_options(flow_name, run_id, user, my_runs, echo):
if user and my_runs:
raise CommandException("--user and --my-runs are mutually exclusive.")

if run_id and my_runs:
raise CommandException("--run_id and --my-runs are mutually exclusive.")

if my_runs:
user = get_username()

latest_run = True

if user and not run_id:
latest_run = False

if not run_id and latest_run:
run_id = get_latest_run_id(echo, flow_name)
if run_id is None:
raise CommandException("A previous run id was not found. Specify --run-id.")

return flow_name, run_id, user
76 changes: 76 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import time
import traceback

from metaflow.plugins.kubernetes.kube_utils import parse_cli_options
from metaflow.plugins.kubernetes.kubernetes_client import KubernetesClient
import metaflow.tracing as tracing
from metaflow import JSONTypeClass, util
from metaflow._vendor import click
Expand Down Expand Up @@ -305,3 +307,77 @@ def _sync_metadata():
sys.exit(METAFLOW_EXIT_DISALLOW_RETRY)
finally:
_sync_metadata()


@kubernetes.command(help="List all runs of the flow on Kubernetes.")
saikonen marked this conversation as resolved.
Show resolved Hide resolved
@click.option(
"--my-runs",
default=False,
is_flag=True,
help="List all my unfinished tasks.",
)
@click.option("--user", default=None, help="List unfinished tasks for the given user.")
@click.option(
"--run-id",
default=None,
help="List unfinished tasks corresponding to the run id.",
)
@click.pass_obj
def list(obj, run_id, user, my_runs):
flow_name, run_id, user = parse_cli_options(
obj.flow.name, run_id, user, my_runs, obj.echo
)
kube_client = KubernetesClient()
pods = kube_client.list(obj.flow.name, run_id, user)

def format_timestamp(timestamp=None):
if timestamp is None:
return "-"
return timestamp.strftime("%Y-%m-%d %H:%M:%S")

for pod in pods:
obj.echo(
"Run: *{run_id}* "
"Pod: *{pod_id}* "
"Started At: {startedAt} "
"Status: *{status}*".format(
run_id=pod.metadata.annotations.get(
"metaflow/run_id",
pod.metadata.labels.get("workflows.argoproj.io/workflow"),
),
pod_id=pod.metadata.name,
startedAt=format_timestamp(pod.status.start_time),
status=pod.status.phase,
)
)

if not pods:
obj.echo("No active jobs found for *%s* on Kubernetes." % (flow_name))
saikonen marked this conversation as resolved.
Show resolved Hide resolved


@kubernetes.command(
help="Terminate Kubernetes tasks for this flow. Only terminates current tasks, but will not affect future ones from retries."
saikonen marked this conversation as resolved.
Show resolved Hide resolved
)
@click.option(
"--my-runs",
default=False,
is_flag=True,
help="Kill all my unfinished tasks.",
)
@click.option(
"--user",
default=None,
help="Terminate unfinished tasks for the given user.",
)
@click.option(
"--run-id",
default=None,
help="Terminate unfinished tasks corresponding to the run id.",
)
@click.pass_obj
def kill(obj, run_id, user, my_runs):
flow_name, run_id, user = parse_cli_options(
obj.flow.name, run_id, user, my_runs, obj.echo
)
kube_client = KubernetesClient()
kube_client.kill_pods(flow_name, run_id, user, obj.echo)
88 changes: 88 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from concurrent.futures import ThreadPoolExecutor
saikonen marked this conversation as resolved.
Show resolved Hide resolved
import os
import sys
import time

from metaflow.exception import MetaflowException
from metaflow.metaflow_config import KUBERNETES_NAMESPACE

from .kubernetes_job import KubernetesJob, KubernetesJobSet

Expand All @@ -28,6 +30,7 @@ def __init__(self):
% sys.executable
)
self._refresh_client()
self._namespace = KUBERNETES_NAMESPACE

def _refresh_client(self):
from kubernetes import client, config
Expand Down Expand Up @@ -60,6 +63,91 @@ def get(self):

return self._client

def _find_active_pods(self, flow_name, run_id=None, user=None):
def _request(_continue=None):
# handle paginated responses
return self._client.CoreV1Api().list_namespaced_pod(
namespace=self._namespace,
# limited selector support for K8S api. We want to cover multiple statuses: Running / Pending / Unknown
field_selector="status.phase!=Succeeded,status.phase!=Failed",
limit=1000,
_continue=_continue,
)

results = _request()

if run_id is not None:
# handle argo prefixes in run_id
run_id = run_id[run_id.startswith("argo-") and len("argo-") :]

while results.metadata._continue or results.items:
for pod in results.items:
match = (
# arbitrary pods might have no annotations at all.
pod.metadata.annotations
and pod.metadata.labels
and (
run_id is None
or (pod.metadata.annotations.get("metaflow/run_id") == run_id)
# we want to also match pods launched by argo-workflows
or (
pod.metadata.labels.get("workflows.argoproj.io/workflow")
== run_id
)
)
and (
user is None
or pod.metadata.annotations.get("metaflow/user") == user
)
and (
pod.metadata.annotations.get("metaflow/flow_name") == flow_name
)
)
if match:
yield pod
if not results.metadata._continue:
break
results = _request(results.metadata._continue)

def list(self, flow_name, run_id, user):
results = self._find_active_pods(flow_name, run_id, user)

return list(results)

def kill_pods(self, flow_name, run_id, user, echo):
from kubernetes.stream import stream

api_instance = self._client.CoreV1Api()
pods = self._find_active_pods(flow_name, run_id, user)

def _kill_pod(pod):
echo(
"Killing Kubernetes pod %s in namespace %s\n"
saikonen marked this conversation as resolved.
Show resolved Hide resolved
% (pod.metadata.name, pod.metadata.namespace)
)
try:
stream(
saikonen marked this conversation as resolved.
Show resolved Hide resolved
api_instance.connect_get_namespaced_pod_exec,
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container="main", # required for argo-workflows due to multiple containers in a pod
command=[
"/bin/sh",
"-c",
"/sbin/killall5",
],
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
except Exception as ex:
# best effort kill for pod can fail.
echo("failed to kill pod %s: %s" % (pod.metadata.name, str(ex)))
saikonen marked this conversation as resolved.
Show resolved Hide resolved

with ThreadPoolExecutor() as executor:
executor.map(_kill_pod, list(pods))

def jobset(self, **kwargs):
return KubernetesJobSet(self, **kwargs)

Expand Down
Loading