From 86b0aa2d89a7ecb4b6caee5e55c52cab9496eb98 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 5 Jun 2025 10:50:23 +0100 Subject: [PATCH 1/2] feat(trainer): Introduce `LocalTrainerClient` Signed-off-by: Eoin Fennessy --- python/kubeflow/trainer/__init__.py | 3 + .../trainer/api/abstract_trainer_client.py | 62 ++++ .../trainer/api/local_trainer_client.py | 244 ++++++++++++++++ python/kubeflow/trainer/api/trainer_client.py | 36 +-- .../local_runtimes/torch_distributed.yaml | 34 +++ .../kubeflow/trainer/constants/constants.py | 21 ++ .../kubeflow/trainer/job_runners/__init__.py | 3 + .../trainer/job_runners/docker_job_runner.py | 267 ++++++++++++++++++ .../trainer/job_runners/job_runner.py | 58 ++++ python/kubeflow/trainer/types/types.py | 20 ++ python/kubeflow/trainer/utils/utils.py | 31 ++ python/pyproject.toml | 2 +- 12 files changed, 749 insertions(+), 32 deletions(-) create mode 100644 python/kubeflow/trainer/api/abstract_trainer_client.py create mode 100644 python/kubeflow/trainer/api/local_trainer_client.py create mode 100644 python/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml create mode 100644 python/kubeflow/trainer/job_runners/__init__.py create mode 100644 python/kubeflow/trainer/job_runners/docker_job_runner.py create mode 100644 python/kubeflow/trainer/job_runners/job_runner.py diff --git a/python/kubeflow/trainer/__init__.py b/python/kubeflow/trainer/__init__.py index 83ab9ff37..3c3c41c51 100644 --- a/python/kubeflow/trainer/__init__.py +++ b/python/kubeflow/trainer/__init__.py @@ -20,6 +20,9 @@ # Import the Kubeflow Trainer client. from kubeflow.trainer.api.trainer_client import TrainerClient +# Import the Kubeflow Local Trainer client. +from kubeflow.trainer.api.local_trainer_client import LocalTrainerClient + # Import the Kubeflow Trainer constants. from kubeflow.trainer.constants.constants import DATASET_PATH, MODEL_PATH diff --git a/python/kubeflow/trainer/api/abstract_trainer_client.py b/python/kubeflow/trainer/api/abstract_trainer_client.py new file mode 100644 index 000000000..189f93a41 --- /dev/null +++ b/python/kubeflow/trainer/api/abstract_trainer_client.py @@ -0,0 +1,62 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Dict, List, Optional + +from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types + + +class AbstractTrainerClient(ABC): + @abstractmethod + def delete_job(self, name: str): + pass + + @abstractmethod + def get_job(self, name: str) -> types.TrainJob: + pass + + @abstractmethod + def get_job_logs( + self, + name: str, + follow: Optional[bool] = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + pass + + @abstractmethod + def get_runtime(self, name: str) -> types.Runtime: + pass + + @abstractmethod + def list_jobs( + self, runtime: Optional[types.Runtime] = None + ) -> List[types.TrainJob]: + pass + + @abstractmethod + def list_runtimes(self) -> List[types.Runtime]: + pass + + @abstractmethod + def train( + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, + ) -> str: + pass diff --git a/python/kubeflow/trainer/api/local_trainer_client.py b/python/kubeflow/trainer/api/local_trainer_client.py new file mode 100644 index 000000000..415d5121a --- /dev/null +++ b/python/kubeflow/trainer/api/local_trainer_client.py @@ -0,0 +1,244 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from importlib import resources +from pathlib import Path +from typing import Dict, List, Optional + +import yaml +from kubeflow.trainer import models +from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient +from kubeflow.trainer.constants import constants +from kubeflow.trainer.job_runners import DockerJobRunner, JobRunner +from kubeflow.trainer.types import types +from kubeflow.trainer.utils import utils + + +class LocalTrainerClient(AbstractTrainerClient): + """LocalTrainerClient exposes functionality for running training jobs locally. + + A Kubernetes cluster is not required. + It exposes the same interface as the TrainerClient. + + Args: + local_runtimes_path: The path to the directory containing runtime YAML files. + Defaults to the runtimes included with the package. + job_runner: The job runner to use for local training. + Options include the DockerJobRunner and PodmanJobRunner. + Defaults to the Docker job runner. + """ + + def __init__( + self, + local_runtimes_path: Optional[Path] = None, + job_runner: Optional[JobRunner] = None, + ): + print( + "Warning: LocalTrainerClient is an alpha feature for Kubeflow Trainer. " + "Some features may be unstable or unimplemented." + ) + + if local_runtimes_path is None: + self.local_runtimes_path = ( + resources.files(constants.PACKAGE_NAME) / constants.LOCAL_RUNTIMES_PATH + ) + else: + self.local_runtimes_path = local_runtimes_path + + if job_runner is None: + self.job_runner = DockerJobRunner() + else: + self.job_runner = job_runner + + def list_runtimes(self) -> List[types.Runtime]: + """Lists all runtimes. + + Returns: + A list of runtime objects. + """ + runtimes = [] + for cr in self.__list_runtime_crs(): + runtimes.append(utils.get_runtime_from_crd(cr)) + return runtimes + + def get_runtime(self, name: str) -> types.Runtime: + """Get a specific runtime by name. + + Args: + name: The name of the runtime. + + Returns: + A runtime object. + + Raises: + RuntimeError: if the specified runtime cannot be found. + """ + for r in self.list_runtimes(): + if r.name == name: + return r + raise RuntimeError(f"No runtime found with name '{name}'") + + def train( + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, + ) -> str: + """Starts a training job. + + Args: + runtime: Config for the train job's runtime. + trainer: Config for the function that encapsulates the model training process. + initializer: Config for dataset and model initialization. + + Returns: + The generated name of the training job. + + Raises: + RuntimeError: if the specified runtime cannot be found, + or the runtime container cannot be found, + or the runtime container image is not specified. + """ + runtime_cr = self.__get_runtime_cr(runtime.name) + if runtime_cr is None: + raise RuntimeError(f"No runtime found with name '{runtime.name}'") + + runtime_container = utils.get_runtime_trainer_container( + runtime_cr.spec.template.spec.replicated_jobs + ) + if runtime_container is None: + raise RuntimeError("No runtime container found") + + image = runtime_container.image + if image is None: + raise RuntimeError("No runtime container image specified") + + if trainer and trainer.func: + entrypoint, command = utils.get_entrypoint_using_train_func( + runtime, + trainer.func, + trainer.func_args, + trainer.pip_index_url, + trainer.packages_to_install, + ) + else: + entrypoint = runtime_container.command + command = runtime_container.args + + if trainer and trainer.num_nodes: + num_nodes = trainer.num_nodes + else: + num_nodes = 1 + + train_job_name = self.job_runner.create_job( + image=image, + entrypoint=entrypoint, + command=command, + num_nodes=num_nodes, + framework=runtime.trainer.framework, + runtime_name=runtime.name, + ) + return train_job_name + + def list_jobs( + self, runtime: Optional[types.Runtime] = None + ) -> List[types.TrainJob]: + """Lists all training jobs. + + Args: + runtime: If provided, only return jobs that use the given runtime. + + Returns: + A list of training jobs. + """ + runtime_name = runtime.name if runtime else None + container_jobs = self.job_runner.list_jobs(runtime_name) + + train_jobs = [] + for container_job in container_jobs: + train_jobs.append(self.__container_job_to_train_job(container_job)) + return train_jobs + + def get_job(self, name: str) -> types.TrainJob: + """Get a specific training job by name. + + Args: + name: The name of the training job to get. + + Returns: + A training job. + """ + container_job = self.job_runner.get_job(name) + return self.__container_job_to_train_job(container_job) + + def get_job_logs( + self, + name: str, + follow: Optional[bool] = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + """Gets logs for the specified training job + Args: + name (str): The name of the training job + follow (bool): If true, follows job logs and prints them to standard out (default False) + step (int): The training job step to target (default "node") + node_rank (int): The node rank to retrieve logs from (default 0) + + Returns: + Dict[str, str]: The logs of the training job, where the key is the + step and node rank, and the value is the logs for that node. + """ + return self.job_runner.get_job_logs( + job_name=name, follow=follow, step=step, node_rank=node_rank + ) + + def delete_job(self, name: str): + """Deletes a specific training job. + + Args: + name: The name of the training job to delete. + """ + self.job_runner.delete_job(job_name=name) + + def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntime]: + runtime_crs = [] + for filename in self.local_runtimes_path.iterdir(): + with open(filename, "r") as f: + cr_str = f.read() + cr_dict = yaml.safe_load(cr_str) + cr = models.TrainerV1alpha1ClusterTrainingRuntime.from_dict(cr_dict) + if cr is not None: + runtime_crs.append(cr) + return runtime_crs + + def __get_runtime_cr( + self, + name: str, + ) -> Optional[models.TrainerV1alpha1ClusterTrainingRuntime]: + for cr in self.__list_runtime_crs(): + if cr.metadata.name == name: + return cr + return None + + def __container_job_to_train_job( + self, container_job: types.ContainerJob + ) -> types.TrainJob: + return types.TrainJob( + name=container_job.name, + creation_timestamp=container_job.creation_timestamp, + steps=[container.to_step() for container in container_job.containers], + runtime=self.get_runtime(container_job.runtime_name), + status=container_job.status, + ) diff --git a/python/kubeflow/trainer/api/trainer_client.py b/python/kubeflow/trainer/api/trainer_client.py index 75bb59627..3d3c82a31 100644 --- a/python/kubeflow/trainer/api/trainer_client.py +++ b/python/kubeflow/trainer/api/trainer_client.py @@ -15,12 +15,10 @@ import logging import multiprocessing import queue -import random -import string -import uuid from typing import Dict, List, Optional import kubeflow.trainer.models as models +from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils @@ -29,7 +27,7 @@ logger = logging.getLogger(__name__) -class TrainerClient: +class TrainerClient(AbstractTrainerClient): def __init__( self, config_file: Optional[str] = None, @@ -105,7 +103,7 @@ def list_runtimes(self) -> List[types.Runtime]: return result for runtime in runtime_list.items: - result.append(self.__get_runtime_from_crd(runtime)) + result.append(utils.get_runtime_from_crd(runtime)) except multiprocessing.TimeoutError: raise TimeoutError( @@ -147,7 +145,7 @@ def get_runtime(self, name: str) -> types.Runtime: f"{self.namespace}/{name}" ) - return self.__get_runtime_from_crd(runtime) # type: ignore + return utils.get_runtime_from_crd(runtime) # type: ignore def train( self, @@ -179,7 +177,7 @@ def train( # Generate unique name for the TrainJob. # TODO (andreyvelich): Discuss this TrainJob name generation. - train_job_name = random.choice(string.ascii_lowercase) + uuid.uuid4().hex[:11] + train_job_name = utils.generate_train_job_name() # Build the Trainer. trainer_crd = models.TrainerV1alpha1Trainer() @@ -463,30 +461,6 @@ def delete_job(self, name: str): f"{constants.TRAINJOB_KIND} {self.namespace}/{name} has been deleted" ) - def __get_runtime_from_crd( - self, - runtime_crd: models.TrainerV1alpha1ClusterTrainingRuntime, - ) -> types.Runtime: - - if not ( - runtime_crd.metadata - and runtime_crd.metadata.name - and runtime_crd.spec - and runtime_crd.spec.ml_policy - and runtime_crd.spec.template.spec - and runtime_crd.spec.template.spec.replicated_jobs - ): - raise Exception(f"ClusterTrainingRuntime CRD is invalid: {runtime_crd}") - - return types.Runtime( - name=runtime_crd.metadata.name, - trainer=utils.get_runtime_trainer( - runtime_crd.spec.template.spec.replicated_jobs, - runtime_crd.spec.ml_policy, - runtime_crd.metadata, - ), - ) - def __get_trainjob_from_crd( self, trainjob_crd: models.TrainerV1alpha1TrainJob, diff --git a/python/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml b/python/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml new file mode 100644 index 000000000..95367e878 --- /dev/null +++ b/python/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml @@ -0,0 +1,34 @@ +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: ClusterTrainingRuntime +metadata: + name: torch-distributed +spec: + mlPolicy: + numNodes: 1 + torch: + numProcPerNode: auto + template: + spec: + replicatedJobs: + - name: node + template: + metadata: + labels: + trainer.kubeflow.org/trainjob-ancestor-step: trainer + spec: + template: + spec: + containers: + - name: node + image: pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime + command: + - /bin/bash + - -c + - | + echo "Torch Distributed Runtime" + + echo "--------------------------------------" + echo "Torch Default Runtime Env" + env | grep PET_ + + pip list diff --git a/python/kubeflow/trainer/constants/constants.py b/python/kubeflow/trainer/constants/constants.py index 63fca9372..71a3c0dce 100644 --- a/python/kubeflow/trainer/constants/constants.py +++ b/python/kubeflow/trainer/constants/constants.py @@ -125,3 +125,24 @@ # The Instruct Datasets class in torchtune TORCHTUNE_INSTRUCT_DATASET = "torchtune.datasets.instruct_dataset" + +# The default path to the directory containing local training runtime configs. +LOCAL_RUNTIMES_PATH = "trainer/config/local_runtimes" + +# The label key used to associate container resources with a train job name. +CONTAINER_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" + +# The label key used to associate container resources with a runtime. +CONTAINER_RUNTIME_LABEL = "trainer.kubeflow.org/runtime" + +# The prefix given to names used for local train jobs. +LOCAL_TRAIN_JOB_NAME_PREFIX = "kubeflow-trainer-" + +# The key for the label used to indicate the rank of a local container node. +LOCAL_NODE_RANK_LABEL = "trainer.kubeflow.org/node-rank" + +# The port number exposed by torch head nodes. +TORCH_HEAD_NODE_PORT = 29500 + +# The name of the Kubeflow Trainer SDK package. +PACKAGE_NAME = "kubeflow" diff --git a/python/kubeflow/trainer/job_runners/__init__.py b/python/kubeflow/trainer/job_runners/__init__.py new file mode 100644 index 000000000..0b468c3fa --- /dev/null +++ b/python/kubeflow/trainer/job_runners/__init__.py @@ -0,0 +1,3 @@ +# flake8: noqa: F401 + +from kubeflow.trainer.job_runners.docker_job_runner import DockerJobRunner, JobRunner diff --git a/python/kubeflow/trainer/job_runners/docker_job_runner.py b/python/kubeflow/trainer/job_runners/docker_job_runner.py new file mode 100644 index 000000000..fd9f24f66 --- /dev/null +++ b/python/kubeflow/trainer/job_runners/docker_job_runner.py @@ -0,0 +1,267 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from typing import Dict, List, Optional + +import docker +from kubeflow.trainer.constants import constants +from kubeflow.trainer.job_runners.job_runner import JobRunner +from kubeflow.trainer.types import types +from kubeflow.trainer.utils import utils + + +class DockerJobRunner(JobRunner): + """DockerJobRunner creates and manages training jobs using Docker. + + Args: + docker_client: If provided, this client is used for Docker API calls. + If not provided, a new client will be created from the user's environment. + """ + + def __init__(self, docker_client: Optional[docker.DockerClient] = None): + if docker_client is None: + self.docker_client = docker.from_env() + else: + self.docker_client = docker_client + + def create_job( + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, + framework: types.Framework, + runtime_name: str, + ) -> str: + """Creates a training job. + + Args: + image: The name of the container image to use for the job. + entrypoint: The entrypoint for the container. + command: The command to run in the container. + num_nodes: The number of nodes to run the job on. + framework: The framework being used. + runtime_name: The name of the runtime being used. + + Returns: + The name of the created job. + + Raises: + RuntimeError: If the framework provided is not supported. + """ + if framework != types.Framework.TORCH: + raise RuntimeError(f"Framework '{framework}' is not currently supported.") + + train_job_name = ( + f"{constants.LOCAL_TRAIN_JOB_NAME_PREFIX}{utils.generate_train_job_name()}" + ) + + docker_network = self.docker_client.networks.create( + name=train_job_name, + driver="bridge", + labels={ + constants.CONTAINER_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.CONTAINER_RUNTIME_LABEL: runtime_name, + }, + ) + + for i in range(num_nodes): + self.docker_client.containers.run( + name=f"{train_job_name}-{i}", + network=docker_network.id, + image=image, + entrypoint=entrypoint, + command=command, + labels={ + constants.CONTAINER_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.LOCAL_NODE_RANK_LABEL: str(i), + constants.CONTAINER_RUNTIME_LABEL: runtime_name, + }, + environment=self.__get_container_environment( + framework=framework, + head_node_address=f"{train_job_name}-0", + num_nodes=num_nodes, + node_rank=i, + ), + detach=True, + ) + + return train_job_name + + def get_job(self, job_name: str) -> types.ContainerJob: + """Get a specified container training job by its name. + + Args: + job_name: The name of the training job to get. + + Returns: + A container training job. + """ + network = self.docker_client.networks.get(job_name) + + docker_containers = self.docker_client.containers.list( + filters={ + "label": [f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}"] + }, + all=True, + ) + + containers = [] + for container in docker_containers: + containers.append( + types.Container( + name=container.name, + status=container.status, + ), + ) + + return types.ContainerJob( + name=job_name, + creation_timestamp=datetime.fromisoformat(network.attrs["Created"]), + runtime_name=network.attrs["Labels"][constants.CONTAINER_RUNTIME_LABEL], + containers=containers, + status=self.__get_job_status(containers), + ) + + def get_job_logs( + self, + job_name: str, + follow: bool = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + """Gets container logs for the training job + + Args: + job_name (str): The name of the training job + follow (bool): If true, follows job logs and prints them to standard out (default False) + step (int): The training job step to target (default "node") + node_rank (int): The node rank to retrieve logs from (default 0) + + Returns: + Dict[str, str]: The logs of the training job, where the key is the + step and node rank, and the value is the logs for that node. + + Raises: + RuntimeError: If the job is not found. + """ + # TODO (eoinfennessy): use "step" in query. + containers = self.docker_client.containers.list( + all=True, + filters={ + "label": [ + f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}", + f"{constants.LOCAL_NODE_RANK_LABEL}={node_rank}", + ] + }, + ) + if len(containers) == 0: + raise RuntimeError(f"Could not find job '{job_name}'") + + logs: Dict[str, str] = {} + if follow: + for line in containers[0].logs(stream=True): + decoded = line.decode("utf-8") + print(decoded) + logs[f"{step}-{node_rank}"] = ( + logs.get(f"{step}-{node_rank}", "") + decoded + "\n" + ) + else: + logs[f"{step}-{node_rank}"] = containers[0].logs().decode() + return logs + + def list_jobs( + self, + runtime_name: Optional[str] = None, + ) -> List[types.ContainerJob]: + """Lists container training jobs. + + Args: + runtime_name: If provided, only return jobs that use the given runtime name. + + Returns: + A list of container training jobs. + """ + jobs = [] + for name in self.__list_job_names(runtime_name): + jobs.append(self.get_job(name)) + return jobs + + def delete_job(self, job_name: str) -> None: + """Deletes all resources associated with a Docker training job. + Args: + job_name (str): The name of the Docker training job. + """ + containers = self.docker_client.containers.list( + all=True, + filters={"label": f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}"}, + ) + for c in containers: + c.remove(force=True) + print(f"Removed container: {c.name}") + + network = self.docker_client.networks.get(job_name) + network.remove() + print(f"Removed network: {network.name}") + + def __list_job_names( + self, + runtime_name: Optional[str] = None, + ) -> List[str]: + """Lists the names of all Docker training jobs. + + Args: + runtime_name (Optional[str]): Filter by runtime name (default None) + + Returns: + List[str]: A list of Docker training job names. + """ + + filters = {"label": [constants.CONTAINER_TRAIN_JOB_NAME_LABEL]} + if runtime_name is not None: + filters["label"].append( + f"{constants.CONTAINER_RUNTIME_LABEL}={runtime_name}" + ) + + # Because a network is created for each job, we use network names to list all jobs. + networks = self.docker_client.networks.list(filters=filters) + + job_names = [] + for n in networks: + job_names.append(n.name) + return job_names + + @staticmethod + def __get_container_environment( + framework: types.Framework, + head_node_address: str, + num_nodes: int, + node_rank: int, + ) -> Dict[str, str]: + if framework != types.Framework.TORCH: + raise RuntimeError(f"Framework '{framework}' is not currently supported.") + + return { + "PET_NNODES": str(num_nodes), + "PET_NPROC_PER_NODE": "1", + "PET_NODE_RANK": str(node_rank), + "PET_MASTER_ADDR": head_node_address, + "PET_MASTER_PORT": str(constants.TORCH_HEAD_NODE_PORT), + } + + @staticmethod + def __get_job_status(_: List[types.Container]) -> str: + # TODO (eoinfennessy): Discuss how to report status + return constants.UNKNOWN diff --git a/python/kubeflow/trainer/job_runners/job_runner.py b/python/kubeflow/trainer/job_runners/job_runner.py new file mode 100644 index 000000000..e5d56b8ee --- /dev/null +++ b/python/kubeflow/trainer/job_runners/job_runner.py @@ -0,0 +1,58 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Dict, List, Optional + +from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types + + +class JobRunner(ABC): + @abstractmethod + def create_job( + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, + framework: types.Framework, + runtime_name: str, + ) -> str: + pass + + @abstractmethod + def get_job(self, job_name: str) -> types.ContainerJob: + pass + + @abstractmethod + def get_job_logs( + self, + job_name: str, + follow: bool = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + pass + + @abstractmethod + def list_jobs( + self, + runtime_name: Optional[str] = None, + ) -> List[types.ContainerJob]: + pass + + @abstractmethod + def delete_job(self, job_name: str) -> None: + pass diff --git a/python/kubeflow/trainer/types/types.py b/python/kubeflow/trainer/types/types.py index 97b375c1d..d616d7a53 100644 --- a/python/kubeflow/trainer/types/types.py +++ b/python/kubeflow/trainer/types/types.py @@ -200,6 +200,26 @@ class TrainJob: status: Optional[str] = constants.UNKNOWN +# Representation for a container used in a local job. +@dataclass +class Container: + name: str + status: str + + def to_step(self) -> Step: + return Step(name=self.name, status=self.status, pod_name=self.name) + + +# Representation for a local container job. +@dataclass +class ContainerJob: + name: str + creation_timestamp: datetime + runtime_name: str + containers: List[Container] + status: Optional[str] = constants.UNKNOWN + + # Configuration for the HuggingFace dataset initializer. # TODO (andreyvelich): Discuss how to keep these configurations is sync with pkg.initializers.types @dataclass diff --git a/python/kubeflow/trainer/utils/utils.py b/python/kubeflow/trainer/utils/utils.py index d983d6f39..b37b2a5ff 100644 --- a/python/kubeflow/trainer/utils/utils.py +++ b/python/kubeflow/trainer/utils/utils.py @@ -15,9 +15,12 @@ import inspect import os import queue +import random import re +import string import textwrap import threading +import uuid from typing import Any, Callable, Dict, List, Optional, Tuple from urllib.parse import urlparse @@ -82,6 +85,30 @@ def get_container_devices( return device, str(device_count) +def get_runtime_from_crd( + runtime_crd: models.TrainerV1alpha1ClusterTrainingRuntime, +) -> types.Runtime: + + if not ( + runtime_crd.metadata + and runtime_crd.metadata.name + and runtime_crd.spec + and runtime_crd.spec.ml_policy + and runtime_crd.spec.template.spec + and runtime_crd.spec.template.spec.replicated_jobs + ): + raise Exception(f"ClusterTrainingRuntime CRD is invalid: {runtime_crd}") + + return types.Runtime( + name=runtime_crd.metadata.name, + trainer=get_runtime_trainer( + runtime_crd.spec.template.spec.replicated_jobs, + runtime_crd.spec.ml_policy, + runtime_crd.metadata, + ), + ) + + def get_runtime_trainer_container( replicated_jobs: List[models.JobsetV1alpha2ReplicatedJob], ) -> Optional[models.IoK8sApiCoreV1Container]: @@ -587,3 +614,7 @@ def get_args_in_dataset_preprocess_config( args.append(f"dataset.column_map={dataset_preprocess_config.column_map}") return args + + +def generate_train_job_name() -> str: + return random.choice(string.ascii_lowercase) + uuid.uuid4().hex[:11] diff --git a/python/pyproject.toml b/python/pyproject.toml index 8f701d981..3c17d0d97 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", ] -dependencies = ["kubernetes>=27.2.0", "pydantic>=2.10.0"] +dependencies = ["kubernetes>=27.2.0", "pydantic>=2.10.0", "docker>=7.1.0"] [project.urls] Homepage = "https://github.com/kubeflow/trainer" From a181db16502be61555c8ce8c020149e2c92ae4ba Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 5 Jun 2025 14:46:56 +0100 Subject: [PATCH 2/2] Move docstring Signed-off-by: Eoin Fennessy --- python/kubeflow/trainer/api/trainer_client.py | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/python/kubeflow/trainer/api/trainer_client.py b/python/kubeflow/trainer/api/trainer_client.py index 3d3c82a31..5548f9e19 100644 --- a/python/kubeflow/trainer/api/trainer_client.py +++ b/python/kubeflow/trainer/api/trainer_client.py @@ -28,6 +28,30 @@ class TrainerClient(AbstractTrainerClient): + """TrainerClient constructor. Configure logging in your application + as follows to see detailed information from the TrainerClient APIs: + + .. code-block:: python + + import logging + logging.basicConfig() + log = logging.getLogger("kubeflow.trainer.api.trainer_client") + log.setLevel(logging.DEBUG) + + Args: + config_file: Path to the kube-config file. Defaults to ~/.kube/config. + context: Set the active context. Defaults to current_context from the kube-config. + client_configuration: Client configuration for cluster authentication. + You have to provide valid configuration with Bearer token or + with username and password. You can find an example here: + https://github.com/kubernetes-client/python/blob/67f9c7a97081b4526470cad53576bc3b71fa6fcc/examples/remote_cluster.py#L31 + namespace: Target Kubernetes namespace. If SDK runs outside of Kubernetes cluster it + takes the namespace from the kube-config context. If SDK runs inside + the Kubernetes cluster it takes namespace from the + `/var/run/secrets/kubernetes.io/serviceaccount/namespace` file. By default it + uses the `default` namespace. + """ + def __init__( self, config_file: Optional[str] = None, @@ -35,28 +59,6 @@ def __init__( client_configuration: Optional[client.Configuration] = None, namespace: Optional[str] = None, ): - """TrainerClient constructor. Configure logging in your application - as follows to see detailed information from the TrainerClient APIs: - .. code-block:: python - import logging - logging.basicConfig() - log = logging.getLogger("kubeflow.trainer.api.trainer_client") - log.setLevel(logging.DEBUG) - - Args: - config_file: Path to the kube-config file. Defaults to ~/.kube/config. - context: Set the active context. Defaults to current_context from the kube-config. - client_configuration: Client configuration for cluster authentication. - You have to provide valid configuration with Bearer token or - with username and password. You can find an example here: - https://github.com/kubernetes-client/python/blob/67f9c7a97081b4526470cad53576bc3b71fa6fcc/examples/remote_cluster.py#L31 - namespace: Target Kubernetes namespace. If SDK runs outside of Kubernetes cluster it - takes the namespace from the kube-config context. If SDK runs inside - the Kubernetes cluster it takes namespace from the - `/var/run/secrets/kubernetes.io/serviceaccount/namespace` file. By default it - uses the `default` namespace. - """ - if namespace is None: namespace = utils.get_default_target_namespace(context)