Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan committed May 10, 2022
1 parent 87ac0b0 commit 207be3e
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
6 changes: 5 additions & 1 deletion platform_api/orchestrator/poller_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ async def schedule(self, unfinished: list[JobRecord]) -> SchedulingResult:
if job.status.is_pending:
jobs_to_start.append(job)

if jobs_to_start:
max_job_to_start_priority = max(job.priority for job in jobs_to_start)

# Process running jobs
for job in unfinished:
if not job.status.is_running:
Expand All @@ -183,12 +186,13 @@ async def schedule(self, unfinished: list[JobRecord]) -> SchedulingResult:
suspended_at = job.status_history.current.transition_time
if (
not jobs_to_start
or job.priority > max_job_to_start_priority
or now - suspended_at >= self._config.max_suspended_time
):
jobs_to_start.append(job)

# Always give priority to materialized jobs
# as they are already created in orchestrator
# as they have already been created in orchestrator
jobs_to_start.sort(
key=lambda job: (
job.materialized,
Expand Down
88 changes: 87 additions & 1 deletion tests/unit/test_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2775,7 +2775,6 @@ async def test_update_jobs_less_than_max_suspended_time(
assert job2.status == JobStatus.RUNNING

mock_orchestrator.update_status_to_return_single(job1.id, JobStatus.PENDING)

mock_orchestrator.update_schedulable_jobs(job1)

await jobs_poller_service.update_jobs_statuses()
Expand Down Expand Up @@ -2892,3 +2891,90 @@ async def test_update_jobs_ordered_by_priority(
job1 = await jobs_service.get_job(job1.id)
assert job1.status == JobStatus.PENDING
assert job1.materialized

async def test_update_jobs_suspended_priority(
self,
jobs_service: JobsService,
jobs_poller_service: JobsPollerService,
mock_orchestrator: MockOrchestrator,
job_request_factory: Callable[[], JobRequest],
test_scheduler: MockJobsScheduler,
test_user: AuthUser,
test_cluster: str,
) -> None:
job1, _ = await jobs_service.create_job(
job_request=job_request_factory(),
user=test_user,
cluster_name=test_cluster,
scheduler_enabled=True,
priority=JobPriority.HIGH,
)

job2, _ = await jobs_service.create_job(
job_request=job_request_factory(),
user=test_user,
cluster_name=test_cluster,
scheduler_enabled=True,
priority=JobPriority.HIGH,
)

job3, _ = await jobs_service.create_job(
job_request=job_request_factory(),
user=test_user,
cluster_name=test_cluster,
scheduler_enabled=True,
priority=JobPriority.NORMAL,
)

mock_orchestrator.update_schedulable_jobs(job1)
mock_orchestrator.update_status_to_return_single(job1.id, JobStatus.RUNNING)

await jobs_poller_service.update_jobs_statuses()

job1 = await jobs_service.get_job(job1.id)
assert job1.status == JobStatus.RUNNING
assert job1.materialized

job2 = await jobs_service.get_job(job2.id)
assert job2.status == JobStatus.PENDING
assert not job2.materialized

job3 = await jobs_service.get_job(job3.id)
assert job3.status == JobStatus.PENDING
assert not job3.materialized

test_scheduler.tick_quantum()

mock_orchestrator.update_preemptible_jobs(job1)
mock_orchestrator.update_status_to_return_single(job2.id, JobStatus.RUNNING)

await jobs_poller_service.update_jobs_statuses()

job1 = await jobs_service.get_job(job1.id)
assert job1.status == JobStatus.SUSPENDED
assert not job1.materialized

job2 = await jobs_service.get_job(job2.id)
assert job2.status == JobStatus.RUNNING
assert job2.materialized

job3 = await jobs_service.get_job(job3.id)
assert job3.status == JobStatus.PENDING
assert not job3.materialized

mock_orchestrator.update_status_to_return_single(job1.id, JobStatus.PENDING)

await jobs_service.cancel_job(job2.id)
await jobs_poller_service.update_jobs_statuses()

job1 = await jobs_service.get_job(job1.id)
assert job1.status == JobStatus.PENDING
assert job1.materialized

job2 = await jobs_service.get_job(job2.id)
assert job2.status == JobStatus.CANCELLED
assert not job2.materialized

job3 = await jobs_service.get_job(job3.id)
assert job3.status == JobStatus.PENDING
assert not job3.materialized

0 comments on commit 207be3e

Please sign in to comment.