Skip to content

Commit

Permalink
support job priority
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan committed May 9, 2022
1 parent 5173e93 commit 7db4644
Show file tree
Hide file tree
Showing 8 changed files with 1,331 additions and 1,206 deletions.
14 changes: 7 additions & 7 deletions platform_api/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ class OrchestratorConfig:
allow_privileged_mode: bool = False
allow_job_priority: bool = False

@property
def allow_scheduler_enabled_job(self) -> bool:
for preset in self.presets:
if preset.scheduler_enabled:
return True
return False

@property
def tpu_resources(self) -> Sequence[TPUResource]:
return tuple(
Expand All @@ -36,13 +43,6 @@ def tpu_ipv4_cidr_block(self) -> Optional[str]:
return None
return tpus[0].ipv4_cidr_block

@property
def has_scheduler_enabled_presets(self) -> bool:
for preset in self.presets:
if preset.scheduler_enabled:
return True
return False


@dataclass(frozen=True)
class IngressConfig:
Expand Down
4 changes: 4 additions & 0 deletions platform_api/orchestrator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ async def preempt_jobs(
) -> list[Job]:
pass

@abstractmethod
async def get_scheduled_jobs(self, jobs: list[Job]) -> list[Job]:
pass

@abstractmethod
async def get_schedulable_jobs(self, jobs: list[Job]) -> list[Job]:
pass
Expand Down
7 changes: 7 additions & 0 deletions platform_api/orchestrator/kube_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,13 @@ async def preempt_jobs(
await self.delete_job(job)
return preempted_jobs

async def get_scheduled_jobs(self, jobs: list[Job]) -> list[Job]:
scheduled = []
for job in jobs:
if self._scheduler.is_pod_scheduled(job.id):
scheduled.append(job)
return scheduled

async def get_schedulable_jobs(self, jobs: list[Job]) -> list[Job]:
job_pods = [self._create_pod_descriptor(job) for job in jobs]
schedulable_pods = self._scheduler.get_schedulable_pods(job_pods)
Expand Down
91 changes: 58 additions & 33 deletions platform_api/orchestrator/poller_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,15 @@ async def schedule(self, unfinished: list[JobRecord]) -> SchedulingResult:
):
jobs_to_start.append(job)

# Always give priority to materialized jobs
# as they are already created in orchestrator
jobs_to_start.sort(
key=lambda job: (
job.materialized,
job.priority,
now - job.status_history.current.transition_time,
)
now - job.status_history.created_at,
),
reverse=True,
)

return SchedulingResult(
Expand Down Expand Up @@ -319,33 +323,35 @@ async def _start_jobs_wrapper(
self, records_to_start: list[JobRecord], records_to_suspend: list[JobRecord]
) -> None:
try:
async with AsyncExitStack() as stack:
for record in itertools.chain(records_to_start, records_to_suspend):
await stack.enter_async_context(self._update_job(record))
try:
async with self._cluster_holder.get() as cluster:
async with AsyncExitStack() as stack:
for record in itertools.chain(
records_to_start, records_to_suspend
):
await stack.enter_async_context(self._update_job(record))

try:
async with self._cluster_holder.get() as cluster:
await self._start_jobs(
cluster, records_to_start, records_to_suspend
)
except ClusterNotFound as cluster_err:
for record in itertools.chain(records_to_start, records_to_suspend):
# marking PENDING/RUNNING job as FAILED
logger.warning(
"Failed to get job '%s' status. Reason: %s",
record.id,
cluster_err,
)
record.status_history.current = JobStatusItem.create(
JobStatus.FAILED,
reason=JobStatusReason.CLUSTER_NOT_FOUND,
description=str(cluster_err),
)
record.materialized = False
await self._revoke_pass_config(record)
except ClusterNotAvailable:
# skipping job status update
pass
except ClusterNotFound as cluster_err:
for record in itertools.chain(records_to_start, records_to_suspend):
# marking PENDING/RUNNING job as FAILED
logger.warning(
"Failed to get job '%s' status. Reason: %s",
record.id,
cluster_err,
)
record.status_history.current = JobStatusItem.create(
JobStatus.FAILED,
reason=JobStatusReason.CLUSTER_NOT_FOUND,
description=str(cluster_err),
)
record.materialized = False
await self._revoke_pass_config(record)
except ClusterNotAvailable:
# skipping job status update
pass
except JobStorageTransactionError:
# intentionally ignoring any transaction failures here because
# the job may have been changed and a retry is needed.
Expand All @@ -361,7 +367,10 @@ async def _start_jobs(
if not records_to_start:
return

if not cluster.config.orchestrator.has_scheduler_enabled_presets:
if (
not cluster.config.orchestrator.allow_scheduler_enabled_job
and not cluster.config.orchestrator.allow_job_priority
):
# Clusters without job scheduler_enabled presets
# and priorities. We can start all jobs at once.
for record in records_to_start:
Expand All @@ -371,32 +380,47 @@ async def _start_jobs(

jobs_to_start = [self._make_job(r, cluster) for r in records_to_start]
jobs_to_suspend = [self._make_job(r, cluster) for r in records_to_suspend]
scheduled_jobs = await cluster.orchestrator.get_scheduled_jobs(
jobs_to_start
)
scheduled_job_ids = {r.id for r in scheduled_jobs}
schedulable_jobs = await cluster.orchestrator.get_schedulable_jobs(
jobs_to_start
)
schedulable_job_ids = {r.id for r in schedulable_jobs}
stop_materializing = False

for job in jobs_to_start:
if job.materialized:
if job.id in scheduled_job_ids:
await self._update_job_status(cluster.orchestrator, job)
continue
elif stop_materializing:
break
if job.id in schedulable_job_ids:
await self._update_job_status(cluster.orchestrator, job)
# If there are enough resources in cluster proceed to the next
# job to start multiple jobs in parallel. It can speed up
# jobs start time if cluster needs to preempt idle jobs
# Do not materialize next jobs until job is scheduled
stop_materializing = True
continue
suspended_jobs = await self._suspend_jobs(
cluster.orchestrator, job, jobs_to_suspend
)
if suspended_jobs:
# Do not proceed to other jobs until job is scheduled
break
await self._update_job_status(cluster.orchestrator, job)
# Do not materialize next jobs until job is scheduled
stop_materializing = True
continue
# For some reason there are no resources for the job, it can
# be skipped until other jobs finish and free some resources.

# Job could be materialized because we expected that there are
# Even if there are no free resources job could be materialized
# during previous poller cycles because we expected that there are
# enough resources.
if job.materialized:
await cluster.orchestrator.delete_job(job)
await self._revoke_pass_config(job)
job.materialized = False

for job in jobs_to_suspend:
if job.status != JobStatus.SUSPENDED:
await self._update_job_status(cluster.orchestrator, job)
Expand All @@ -410,7 +434,7 @@ async def _suspend_jobs(
orchestrator: Orchestrator,
job_to_start: Job,
jobs_to_suspend: list[Job],
) -> None:
) -> list[Job]:
jobs_to_suspend = [
job for job in jobs_to_suspend if job_to_start.priority >= job.priority
]
Expand All @@ -420,6 +444,7 @@ async def _suspend_jobs(
for job in suspended_jobs:
job.status = JobStatus.SUSPENDED
job.materialized = False
return suspended_jobs

async def _update_job_status_wrapper(self, job_record: JobRecord) -> None:
try:
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_kube_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2151,6 +2151,33 @@ async def start_watchers(
async with exit_stack:
yield

@pytest.mark.usefixtures("start_watchers")
async def test_get_scheduled_jobs(
self, kube_client: MyKubeClient, kube_orchestrator: KubeOrchestrator
) -> None:
container = Container(
image="ubuntu:20.10",
resources=ContainerResources(cpu=0.1, memory_mb=64),
)
job = MyJob(
orchestrator=kube_orchestrator,
record=JobRecord.create(
name=f"job-{uuid.uuid4().hex[:6]}",
owner="owner1",
request=JobRequest.create(container),
cluster_name="test-cluster",
),
)

scheduled = await kube_orchestrator.get_scheduled_jobs([job])
assert scheduled == []

await kube_orchestrator.start_job(job)
await kube_client.wait_pod_is_running(pod_name=job.id, timeout_s=60.0)

scheduled = await kube_orchestrator.get_scheduled_jobs([job])
assert scheduled == [job]

@pytest.mark.usefixtures("start_watchers")
async def test_get_schedulable_jobs(
self, kube_orchestrator: KubeOrchestrator, node_resources: NodeResources
Expand Down
48 changes: 36 additions & 12 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ def __init__(self, config: ClusterConfig) -> None:
self.current_datetime_factory: Callable[[], datetime] = partial(
datetime.now, timezone.utc
)
self._successfully_deleted_jobs: list[Job] = []
self._preempted_jobs: list[Job] = []
self._deleted_job_ids: list[str] = []
self._preemptible_job_ids: list[str] = []
self._scheduled_job_ids: list[str] = []
self._schedulable_job_ids: list[str] = []

@property
def config(self) -> OrchestratorConfig:
Expand Down Expand Up @@ -121,10 +123,13 @@ async def get_job_status(self, job: Job) -> JobStatusItem:
def _create_delete_job_exc(self, job: Job) -> Exception:
return JobError()

def get_deleted_job_ids(self) -> list[str]:
return self._deleted_job_ids

async def delete_job(self, job: Job) -> JobStatus:
if self.raise_on_delete:
raise self.delete_job_exc_factory(job)
self._successfully_deleted_jobs.append(job)
self._deleted_job_ids.append(job.id)
return JobStatus.SUCCEEDED

def update_status_to_return(self, new_status: JobStatus) -> None:
Expand All @@ -151,9 +156,6 @@ def update_exit_code_to_return_single(
) -> None:
self._mock_exit_codes[job_id] = new_exit_code

def get_successfully_deleted_jobs(self) -> list[Job]:
return self._successfully_deleted_jobs

async def get_missing_secrets(
self, user_name: str, secret_names: list[str]
) -> list[str]:
Expand All @@ -162,17 +164,40 @@ async def get_missing_secrets(
async def get_missing_disks(self, disks: list[Disk]) -> list[Disk]:
pass

def get_preempted_job_ids(self) -> list[str]:
return [job.id for job in self._preempted_jobs]
def update_preemptible_jobs(self, *jobs: Union[Job, list[Job]]) -> None:
self._preemptible_job_ids = []
for job in jobs:
if isinstance(job, Job):
self._preemptible_job_ids.append(job.id)
else:
self._preemptible_job_ids.extend([job.id for job in job])

async def preempt_jobs(
self, jobs_to_schedule: list[Job], preemptible_jobs: list[Job]
) -> list[Job]:
self._preempted_jobs = preemptible_jobs.copy()
return preemptible_jobs
return [job for job in preemptible_jobs if job.id in self._preemptible_job_ids]

def update_scheduled_jobs(self, *jobs: Union[Job, list[Job]]) -> None:
self._scheduled_job_ids = []
for job in jobs:
if isinstance(job, Job):
self._scheduled_job_ids.append(job.id)
else:
self._scheduled_job_ids.extend([job.id for job in job])

async def get_scheduled_jobs(self, jobs: list[Job]) -> list[Job]:
return [job for job in jobs if job.id in self._scheduled_job_ids]

def update_schedulable_jobs(self, *jobs: Union[Job, list[Job]]) -> None:
self._schedulable_job_ids = []
for job in jobs:
if isinstance(job, Job):
self._schedulable_job_ids.append(job.id)
else:
self._schedulable_job_ids.extend([job.id for job in job])

async def get_schedulable_jobs(self, jobs: list[Job]) -> list[Job]:
return jobs
return [job for job in jobs if job.id in self._schedulable_job_ids]


class MockJobsStorage(InMemoryJobsStorage):
Expand Down Expand Up @@ -489,7 +514,6 @@ def jobs_service(
mock_jobs_storage: MockJobsStorage,
jobs_config: JobsConfig,
mock_notifications_client: NotificationsClient,
scheduler_config: JobsSchedulerConfig,
mock_auth_client: AuthClient,
mock_admin_client: AdminClient,
mock_api_base: URL,
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_cluster_config_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def test_orchestrator_resource_presets_default(
assert clusters[0].orchestrator.presets == [
Preset(name="cpu", credits_per_hour=Decimal("10"), cpu=1, memory_mb=2048)
]
assert clusters[0].orchestrator.has_scheduler_enabled_presets is False
assert clusters[0].orchestrator.allow_scheduler_enabled_job is False

def test_orchestrator_resource_presets_custom(
self, clusters_payload: Sequence[dict[str, Any]]
Expand Down Expand Up @@ -344,7 +344,7 @@ def test_orchestrator_resource_presets_custom(
preemptible_node=True,
),
]
assert clusters[0].orchestrator.has_scheduler_enabled_presets is True
assert clusters[0].orchestrator.allow_scheduler_enabled_job is True

def test_orchestrator_job_schedule_settings_default(
self, clusters_payload: Sequence[dict[str, Any]]
Expand Down
Loading

0 comments on commit 7db4644

Please sign in to comment.