Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add idle job preemption #1927

Merged
merged 22 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions platform_api/kube_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .cluster import Cluster
from .cluster_config import ClusterConfig
from .config import RegistryConfig, StorageConfig
from .orchestrator.kube_client import KubeClient, PodWatcher
from .orchestrator.kube_config import KubeConfig
from .orchestrator.kube_orchestrator import KubeOrchestrator, Orchestrator

Expand Down Expand Up @@ -47,15 +48,33 @@ async def init(self) -> None:

async def _init_orchestrator(self) -> None:
logger.info(f"Cluster '{self.name}': initializing Orchestrator")
kube_client = KubeClient(
base_url=self._kube_config.endpoint_url,
cert_authority_data_pem=self._kube_config.cert_authority_data_pem,
cert_authority_path=self._kube_config.cert_authority_path,
auth_type=self._kube_config.auth_type,
auth_cert_path=self._kube_config.auth_cert_path,
auth_cert_key_path=self._kube_config.auth_cert_key_path,
token=self._kube_config.token,
token_path=self._kube_config.token_path,
namespace=self._kube_config.namespace,
conn_timeout_s=self._kube_config.client_conn_timeout_s,
read_timeout_s=self._kube_config.client_read_timeout_s,
conn_pool_size=self._kube_config.client_conn_pool_size,
trace_configs=self._trace_configs,
)
pod_watcher = PodWatcher(kube_client)
orchestrator = KubeOrchestrator(
cluster_name=self.name,
storage_configs=self._storage_configs,
registry_config=self._registry_config,
orchestrator_config=self._cluster_config.orchestrator,
kube_config=self._kube_config,
trace_configs=self._trace_configs,
kube_client=kube_client,
)
await self._exit_stack.enter_async_context(orchestrator)
orchestrator.register(pod_watcher)
await self._exit_stack.enter_async_context(kube_client)
await self._exit_stack.enter_async_context(pod_watcher)
self._orchestrator = orchestrator

async def close(self) -> None:
Expand Down
10 changes: 10 additions & 0 deletions platform_api/orchestrator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ async def get_job_status(self, job: Job) -> JobStatusItem:
async def delete_job(self, job: Job) -> JobStatus:
pass

@abstractmethod
async def preempt_jobs(
self, jobs_to_schedule: list[Job], preemptible_jobs: list[Job]
) -> list[Job]:
pass

@abstractmethod
async def preempt_idle_jobs(self, jobs_to_schedule: list[Job]) -> bool:
pass

@abstractmethod
async def get_missing_secrets(
self, secret_path: str, secret_names: list[str]
Expand Down
27 changes: 19 additions & 8 deletions platform_api/orchestrator/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,10 @@ def _parse_memory(cls, memory: str) -> int:
return int(memory[:-2]) * 1024
raise ValueError("Memory format is not supported")

@property
def any(self) -> bool:
return self.cpu_mcores > 0 or self.memory > 0 or self.gpu > 0

@property
def cpu_mcores(self) -> int:
return int(self.cpu * 1000)
Expand Down Expand Up @@ -2313,6 +2317,17 @@ async def wait_pod_is_terminated(
return
await asyncio.sleep(interval_s)

async def wait_pod_is_deleted(
self, pod_name: str, timeout_s: float = 10.0 * 60, interval_s: float = 1.0
) -> None:
async with timeout(timeout_s):
while True:
try:
await self.get_pod(pod_name)
await asyncio.sleep(interval_s)
except JobNotFoundException:
return

async def create_default_network_policy(
self,
name: str,
Expand Down Expand Up @@ -2460,6 +2475,8 @@ async def stop(self) -> None:
self._watcher_task.cancel()
with suppress(asyncio.CancelledError):
await self._watcher_task
self._watcher_task = None
self._handlers.clear()

async def _run(self, resource_version: str) -> None:
while True:
Expand Down Expand Up @@ -2497,7 +2514,7 @@ def get_pods_to_preempt(
if resources.gpu:
pods = [p for p in pods if p.resources and p.resources.gpu]
pods_to_preempt: list[PodDescriptor] = []
while pods and cls._has_resources(resources):
while pods and resources.any:
logger.debug("Pods left: %d", len(pods))
logger.debug("Resources left: %s", resources)
# max distance for a single resource is 1, 3 resources total
Expand Down Expand Up @@ -2528,18 +2545,12 @@ def get_pods_to_preempt(
resources.memory,
resources.gpu or 0,
)
if cls._has_resources(resources):
if resources.any:
logger.debug("Pods to preempt: []")
return []
logger.debug("Pods to preempt: %s", [p.name for p in pods_to_preempt])
return pods_to_preempt

@classmethod
def _has_resources(cls, resources: NodeResources) -> bool:
return (
resources.cpu_mcores > 0 or resources.memory > 0 or (resources.gpu or 0) > 0
)

@classmethod
def _subtract_resources(cls, r1: NodeResources, r2: Resources) -> NodeResources:
return replace(
Expand Down
61 changes: 34 additions & 27 deletions platform_api/orchestrator/kube_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from pathlib import PurePath
from typing import Any, Optional, Union

import aiohttp

from platform_api.cluster_config import OrchestratorConfig
from platform_api.config import RegistryConfig, StorageConfig
from platform_api.resource import ResourcePoolType
Expand Down Expand Up @@ -42,6 +40,7 @@
PodExec,
PodRestartPolicy,
PodStatus,
PodWatcher,
PVCVolume,
SecretVolume,
Service,
Expand All @@ -50,6 +49,7 @@
VolumeMount,
)
from .kube_config import KubeConfig
from .kube_orchestrator_preemption import KubeOrchestratorPreemption

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,33 +122,16 @@ def __init__(
registry_config: RegistryConfig,
orchestrator_config: OrchestratorConfig,
kube_config: KubeConfig,
trace_configs: Optional[list[aiohttp.TraceConfig]] = None,
kube_client: KubeClient,
) -> None:
self._loop = asyncio.get_event_loop()
self._cluster_name = cluster_name
self._storage_configs = storage_configs
self._registry_config = registry_config
self._orchestrator_config = orchestrator_config
self._kube_config = kube_config

# TODO (A Danshyn 05/21/18): think of the namespace life-time;
# should we ensure it does exist before continuing

self._client = KubeClient(
base_url=kube_config.endpoint_url,
cert_authority_data_pem=kube_config.cert_authority_data_pem,
cert_authority_path=kube_config.cert_authority_path,
auth_type=kube_config.auth_type,
auth_cert_path=kube_config.auth_cert_path,
auth_cert_key_path=kube_config.auth_cert_key_path,
token=kube_config.token,
token_path=kube_config.token_path,
namespace=kube_config.namespace,
conn_timeout_s=kube_config.client_conn_timeout_s,
read_timeout_s=kube_config.client_read_timeout_s,
conn_pool_size=kube_config.client_conn_pool_size,
trace_configs=trace_configs,
)
self._client = kube_client
self._preemption = KubeOrchestratorPreemption(kube_client)

# TODO (A Danshyn 11/16/18): make this configurable at some point
self._docker_secret_name_prefix = "neurouser-"
Expand All @@ -167,12 +150,13 @@ def orchestrator_config(self) -> OrchestratorConfig:
def kube_config(self) -> KubeConfig:
return self._kube_config

# TODO: remove
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup later to avoid unnecessary changes in pr

async def __aenter__(self) -> "KubeOrchestrator":
await self._client.__aenter__()
return self

# TODO: remove
async def __aexit__(self, *args: Any) -> None:
await self._client.close()
pass

@property
def _main_storage_config(self) -> StorageConfig:
Expand All @@ -189,6 +173,9 @@ def _extra_storage_configs(self) -> list[StorageConfig]:
result.append(sc)
return result

def register(self, pod_watcher: PodWatcher) -> None:
self._preemption.register(pod_watcher)

def create_storage_volumes(
self, container_volume: ContainerVolume
) -> Sequence[PathVolume]:
Expand Down Expand Up @@ -333,7 +320,7 @@ async def _delete_pod_network_policy(self, job: Job) -> None:
except Exception as e:
logger.warning(f"Failed to remove network policy {name}: {e}")

async def _create_pod_descriptor(
def _create_pod_descriptor(
self, job: Job, tolerate_unreachable_node: bool = False
) -> PodDescriptor:
pool_types = self._get_cheapest_pool_types(job)
Expand Down Expand Up @@ -541,7 +528,7 @@ async def start_job(
try:
await self._create_pod_network_policy(job)

descriptor = await self._create_pod_descriptor(
descriptor = self._create_pod_descriptor(
job, tolerate_unreachable_node=tolerate_unreachable_node
)
pod = await self._client.create_pod(descriptor)
Expand Down Expand Up @@ -744,7 +731,7 @@ async def _check_restartable_job_pod(self, job: Job) -> PodDescriptor:

if do_recreate_pod:
logger.info(f"Recreating preempted pod '{pod_name}'. Job '{job.id}'")
descriptor = await self._create_pod_descriptor(job)
descriptor = self._create_pod_descriptor(job)
try:
pod = await self._client.create_pod(descriptor)
except JobError:
Expand Down Expand Up @@ -875,3 +862,23 @@ async def delete_all_job_resources(self, job_id: str) -> None:
await self._client.delete_all_ingresses(labels=labels)
await self._client.delete_all_services(labels=labels)
await self._client.delete_all_network_policies(labels=labels)

async def preempt_jobs(
self, jobs_to_schedule: list[Job], preemptible_jobs: list[Job]
) -> list[Job]:
job_pods_to_schedule = [
self._create_pod_descriptor(job) for job in jobs_to_schedule
]
preemptible_job_pods = [
self._create_pod_descriptor(job) for job in preemptible_jobs
]
preempted_pods = await self._preemption.preempt_pods(
job_pods_to_schedule, preemptible_job_pods
)
preempted_pod_names = {pod.name for pod in preempted_pods}
return [job for job in preemptible_jobs if job.id in preempted_pod_names]

async def preempt_idle_jobs(self, jobs_to_schedule: list[Job]) -> bool:
job_pods = [self._create_pod_descriptor(job) for job in jobs_to_schedule]
preempted_pods = await self._preemption.preempt_idle_pods(job_pods)
return len(preempted_pods) > 0
Loading