Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan committed Apr 22, 2022
1 parent ddbf492 commit 29674ae
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 194 deletions.
14 changes: 6 additions & 8 deletions platform_api/orchestrator/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,10 @@ def _parse_memory(cls, memory: str) -> int:
return int(memory[:-2]) * 1024
raise ValueError("Memory format is not supported")

@property
def any(self) -> bool:
return self.cpu_mcores > 0 or self.memory > 0 or self.gpu > 0

@property
def cpu_mcores(self) -> int:
return int(self.cpu * 1000)
Expand Down Expand Up @@ -2510,7 +2514,7 @@ def get_pods_to_preempt(
if resources.gpu:
pods = [p for p in pods if p.resources and p.resources.gpu]
pods_to_preempt: list[PodDescriptor] = []
while pods and cls._has_resources(resources):
while pods and resources.any:
logger.debug("Pods left: %d", len(pods))
logger.debug("Resources left: %s", resources)
# max distance for a single resource is 1, 3 resources total
Expand Down Expand Up @@ -2541,18 +2545,12 @@ def get_pods_to_preempt(
resources.memory,
resources.gpu or 0,
)
if cls._has_resources(resources):
if resources.any:
logger.debug("Pods to preempt: []")
return []
logger.debug("Pods to preempt: %s", [p.name for p in pods_to_preempt])
return pods_to_preempt

@classmethod
def _has_resources(cls, resources: NodeResources) -> bool:
return (
resources.cpu_mcores > 0 or resources.memory > 0 or (resources.gpu or 0) > 0
)

@classmethod
def _subtract_resources(cls, r1: NodeResources, r2: Resources) -> NodeResources:
return replace(
Expand Down
77 changes: 33 additions & 44 deletions platform_api/orchestrator/kube_orchestrator_preemption.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
NotFoundException,
PodDescriptor,
PodEventHandler,
PodStatus,
PodWatcher,
PodWatchEvent,
WatchEventType,
Expand All @@ -23,51 +24,43 @@
logger = logging.getLogger(__name__)


class _Pod(dict[str, Any]):
class _Pod:
def __init__(self, payload: dict[str, Any]) -> None:
self._payload = payload
self._status = PodStatus(payload["status"])

@property
def payload(self) -> dict[str, Any]:
return self._payload

@property
def name(self) -> str:
return self["metadata"]["name"]
return self._payload["metadata"]["name"]

@property
def labels(self) -> dict[str, str]:
return self["metadata"].get("labels", {})
return self._payload["metadata"].get("labels", {})

@property
def is_idle(self) -> bool:
return bool(self.labels.get("platform.neuromation.io/idle"))

@property
def node_name(self) -> str | None:
return self["spec"].get("nodeName")

@property
def is_pending(self) -> bool:
return self["status"]["phase"] == "Pending"

@property
def is_waiting_for_node(self) -> bool:
return self.is_pending and not bool(self["spec"].get("nodeName"))

@property
def is_scheduled(self) -> bool:
return self.is_pending and bool(self.node_name)

@property
def is_running(self) -> bool:
return self["status"]["phase"] == "Running"
def is_terminating(self) -> bool:
return bool(self._payload["metadata"].get("deletionTimestamp"))

@property
def is_bound_to_node(self) -> bool:
return self.is_scheduled or self.is_running
def status(self) -> PodStatus:
return self._status

@property
def is_terminating(self) -> bool:
return bool(self["metadata"].get("deletionTimestamp"))
def node_name(self) -> str:
return self._payload["spec"]["nodeName"]

@property
def resource_requests(self) -> NodeResources:
pod_resources = NodeResources()
for container in self["spec"]["containers"]:
for container in self._payload["spec"]["containers"]:
resources = container.get("resources")
if not resources:
continue
Expand All @@ -88,25 +81,23 @@ def __init__(self, kube_client: KubeClient) -> None:
async def init(self, raw_pods: list[dict[str, Any]]) -> None:
for raw_pod in raw_pods:
pod = _Pod(raw_pod)
if pod.is_bound_to_node:
if pod.status.is_scheduled and not pod.status.is_terminated:
await self._add_pod(pod)

async def handle(self, event: PodWatchEvent) -> None:
pod = _Pod(event.raw_pod)
if pod.is_waiting_for_node:
if not pod.status.is_scheduled:
return
if event.type == WatchEventType.DELETED:
if event.type == WatchEventType.DELETED or pod.status.is_terminated:
self._remove_pod(pod)
elif pod.is_bound_to_node:
await self._add_pod(pod)
else:
self._remove_pod(pod)
await self._add_pod(pod)

async def _add_pod(self, pod: _Pod) -> None:
pod_name = pod.name
if pod_name in self._pod_names:
return
node_name: str = pod.node_name # type: ignore
node_name = pod.node_name
# Ignore error in case node was removed/lost but pod was not yet removed
with suppress(NotFoundException):
if node_name not in self._nodes:
Expand All @@ -120,7 +111,7 @@ def _remove_pod(self, pod: _Pod) -> None:
pod_name = pod.name
if pod_name not in self._pod_names:
return
node_name: str = pod.node_name # type: ignore
node_name = pod.node_name
node = self._nodes.get(node_name)
if node:
node_free_resources = self._node_free_resources[node_name]
Expand Down Expand Up @@ -157,33 +148,31 @@ def __init__(self) -> None:
async def init(self, raw_pods: list[dict[str, Any]]) -> None:
for raw_pod in raw_pods:
pod = _Pod(raw_pod)
if pod.is_idle and pod.is_bound_to_node:
if pod.is_idle and pod.status.is_scheduled and not pod.status.is_terminated:
self._add_pod(pod)

async def handle(self, event: PodWatchEvent) -> None:
pod = _Pod(event.raw_pod)
if not pod.is_idle or pod.is_waiting_for_node:
if not pod.is_idle or not pod.status.is_scheduled:
return
if event.type == WatchEventType.DELETED:
if event.type == WatchEventType.DELETED or pod.status.is_terminated:
self._notify_pod_terminating(pod) # in case it's force delete
self._remove_pod(pod)
elif pod.is_bound_to_node:
else:
self._add_pod(pod)
if pod.is_terminating:
self._notify_pod_terminating(pod)
else:
self._remove_pod(pod)

def _add_pod(self, pod: _Pod) -> None:
pod_name = pod.name
node_name: str = pod.node_name # type: ignore
node_name = pod.node_name
# there is an issue in k8s, elements in items don't have kind and version
pod["kind"] = "Pod"
pod.payload["kind"] = "Pod"
self._pod_names.add(pod_name)
self._pods[node_name][pod_name] = PodDescriptor.from_primitive(pod)
self._pods[node_name][pod_name] = PodDescriptor.from_primitive(pod.payload)

def _remove_pod(self, pod: _Pod) -> None:
node_name: str = pod.node_name # type: ignore
node_name = pod.node_name
pod_name = pod.name
pods = self._pods[node_name]
pods.pop(pod_name, None)
Expand Down
Loading

0 comments on commit 29674ae

Please sign in to comment.