Skip to content

Commit

Permalink
Test and fix issues from integer times
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Feb 10, 2025
1 parent 0b98731 commit 8e2d207
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
10 changes: 5 additions & 5 deletions dispatcher/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, worker_id: int, finished_queue: multiprocessing.Queue):

# Info specific to the current task being ran
self.current_task: Optional[dict] = None
self.started_at: Optional[float] = None
self.started_at: Optional[int] = None
self.is_active_cancel: bool = False

# Tracking information for worker
Expand Down Expand Up @@ -155,7 +155,7 @@ async def manage_workers(self) -> None:
self.events.management_event.clear()
logger.debug('Pool worker management task exiting')

async def process_worker_timeouts(self, current_time: float) -> Optional[float]:
async def process_worker_timeouts(self, current_time: float) -> Optional[int]:
"""
Cancels tasks that have exceeded their timeout.
Returns the system clock time of the next task timeout, for rescheduling.
Expand All @@ -164,12 +164,12 @@ async def process_worker_timeouts(self, current_time: float) -> Optional[float]:
for worker in self.workers.values():
if (not worker.is_active_cancel) and worker.current_task and worker.started_at and (worker.current_task.get('timeout')):
timeout: float = worker.current_task['timeout']
worker_deadline = worker.started_at + timeout
worker_deadline = worker.started_at + int(timeout * 1.0e9)

# Established that worker is running a task that has a timeout
if worker_deadline < current_time:
uuid: str = worker.current_task.get('uuid', '<unknown>')
delta: float = current_time - worker.started_at
delta: float = (current_time - worker.started_at) * 1.0e9
logger.info(f'Worker {worker.worker_id} runtime {delta:.5f}(s) for task uuid={uuid} exceeded timeout {timeout}(s), canceling')
worker.cancel()
elif next_deadline is None or worker_deadline < next_deadline:
Expand All @@ -183,7 +183,7 @@ async def manage_timeout(self) -> None:
current_time = time.monotonic_ns()
pool_deadline = await self.process_worker_timeouts(current_time)
if pool_deadline:
time_until_deadline = pool_deadline - current_time
time_until_deadline = (pool_deadline - current_time) * 1.0e-9
try:
await asyncio.wait_for(self.events.timeout_event.wait(), timeout=time_until_deadline)
except asyncio.TimeoutError:
Expand Down
48 changes: 46 additions & 2 deletions tests/integration/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from tests.data import methods as test_methods

SLEEP_METHOD = 'lambda: __import__("time").sleep(1.5)'
LIGHT_SLEEP_METHOD = 'lambda: __import__("time").sleep(0.03)'


@pytest.mark.asyncio
async def test_task_timeout(apg_dispatcher, pg_message):
assert apg_dispatcher.pool.finished_count == 0

start_time = time.monotonic_ns()
start_time = time.monotonic()

clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
await pg_message(json.dumps({
Expand All @@ -22,7 +23,50 @@ async def test_task_timeout(apg_dispatcher, pg_message):
}))
await asyncio.wait_for(clearing_task, timeout=3)

delta = time.monotonic_ns() - start_time
delta = time.monotonic() - start_time

assert delta < 1.0 # proves task did not run to completion
assert apg_dispatcher.pool.canceled_count == 1


@pytest.mark.asyncio
async def test_multiple_task_timeouts(apg_dispatcher, pg_message):
assert apg_dispatcher.pool.finished_count == 0

start_time = time.monotonic()

clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
for i in range(5):
await pg_message(json.dumps({
'task': SLEEP_METHOD,
'timeout': 0.01*i + 0.01,
'uuid': f'test_multiple_task_timeouts_{i}'
}))
await asyncio.wait_for(clearing_task, timeout=3)

delta = time.monotonic() - start_time

assert delta < 1.0 # proves task did not run to completion
assert apg_dispatcher.pool.canceled_count == 5


@pytest.mark.asyncio
async def test_mixed_timeouts_non_timeouts(apg_dispatcher, pg_message):
assert apg_dispatcher.pool.finished_count == 0

start_time = time.monotonic()

clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
for i in range(6):
await pg_message(json.dumps({
'task': SLEEP_METHOD if (i % 2) else LIGHT_SLEEP_METHOD,
'timeout': 0.01 * (i % 2),
'uuid': f'test_multiple_task_timeouts_{i}'
}))
await asyncio.wait_for(clearing_task, timeout=3)

delta = time.monotonic() - start_time

assert delta < 1.0
# half of the tasks should be finished, half should have been canceled
assert apg_dispatcher.pool.canceled_count == 3

0 comments on commit 8e2d207

Please sign in to comment.