|
20 | 20 | import re |
21 | 21 | import string |
22 | 22 | import time |
23 | | -from typing import Optional, Union |
| 23 | +from typing import List, Optional, Union |
24 | 24 | import uuid |
25 | 25 |
|
26 | 26 | from kubeflow_trainer_api import models |
@@ -57,6 +57,54 @@ def __init__( |
57 | 57 |
|
58 | 58 | self.namespace = cfg.namespace |
59 | 59 |
|
| 60 | + def _select_best_pod_for_role(self, pods: List[models.IoK8sApiCoreV1Pod]) -> Optional[models.IoK8sApiCoreV1Pod]: |
| 61 | + """ |
| 62 | + Select the best Pod for a role based on status priority and creation timestamp. |
| 63 | + |
| 64 | + Priority order: |
| 65 | + 1. Running or Succeeded Pods (prefer most recent) |
| 66 | + 2. Failed Pods (prefer most recent) |
| 67 | + 3. Pending Pods (prefer most recent) |
| 68 | + 4. Unknown Pods (prefer most recent) |
| 69 | + """ |
| 70 | + if not pods: |
| 71 | + return None |
| 72 | + |
| 73 | + # Pod status priority (higher number = higher priority) |
| 74 | + status_priority = { |
| 75 | + constants.POD_RUNNING: 4, # Highest priority |
| 76 | + constants.POD_SUCCEEDED: 3, # Second highest |
| 77 | + constants.POD_FAILED: 2, # Third priority |
| 78 | + constants.POD_PENDING: 1, # Low priority |
| 79 | + constants.POD_UNKNOWN: 0, # Lowest priority |
| 80 | + } |
| 81 | + |
| 82 | + # Group Pods by status priority |
| 83 | + pods_by_status = {} |
| 84 | + for pod in pods: |
| 85 | + status = pod.status.phase if pod.status else constants.POD_UNKNOWN |
| 86 | + priority = status_priority.get(status, 0) |
| 87 | + |
| 88 | + if priority not in pods_by_status: |
| 89 | + pods_by_status[priority] = [] |
| 90 | + pods_by_status[priority].append(pod) |
| 91 | + |
| 92 | + # Find the highest priority status that has Pods |
| 93 | + highest_priority = max(pods_by_status.keys()) if pods_by_status else 0 |
| 94 | + candidate_pods = pods_by_status[highest_priority] |
| 95 | + |
| 96 | + # Among Pods with the same status, select the most recent one |
| 97 | + if len(candidate_pods) == 1: |
| 98 | + return candidate_pods[0] |
| 99 | + |
| 100 | + # Sort by creation timestamp (most recent first) |
| 101 | + candidate_pods.sort( |
| 102 | + key=lambda p: p.metadata.creation_timestamp or "", |
| 103 | + reverse=True |
| 104 | + ) |
| 105 | + |
| 106 | + return candidate_pods[0] |
| 107 | + |
60 | 108 | def list_runtimes(self) -> list[types.Runtime]: |
61 | 109 | result = [] |
62 | 110 | try: |
@@ -522,19 +570,20 @@ def __get_trainjob_from_crd( |
522 | 570 | int(pod.metadata.labels[constants.JOB_INDEX_LABEL]) |
523 | 571 | ) |
524 | 572 |
|
525 | | - # Keep only the most recently created Pod for each role |
| 573 | + # Collect all Pods for this role |
526 | 574 | if role_key not in pods_by_role: |
527 | | - pods_by_role[role_key] = pod |
528 | | - else: |
529 | | - # Compare creation timestamps to keep the most recent |
530 | | - current_pod = pods_by_role[role_key] |
531 | | - if (pod.metadata.creation_timestamp and |
532 | | - current_pod.metadata.creation_timestamp and |
533 | | - pod.metadata.creation_timestamp > current_pod.metadata.creation_timestamp): |
534 | | - pods_by_role[role_key] = pod |
535 | | - |
536 | | - # Process only the most recent Pod for each role |
537 | | - for role_key, pod in pods_by_role.items(): |
| 575 | + pods_by_role[role_key] = [] |
| 576 | + pods_by_role[role_key].append(pod) |
| 577 | + |
| 578 | + # Select the best Pod for each role using status-priority logic |
| 579 | + selected_pods = {} |
| 580 | + for role_key, pods in pods_by_role.items(): |
| 581 | + best_pod = self._select_best_pod_for_role(pods) |
| 582 | + if best_pod: |
| 583 | + selected_pods[role_key] = best_pod |
| 584 | + |
| 585 | + # Process only the selected Pod for each role |
| 586 | + for role_key, pod in selected_pods.items(): |
538 | 587 | replicated_job_name, job_index = role_key |
539 | 588 |
|
540 | 589 | # Get the Initializer step. |
|
0 commit comments