diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 9ecd9020..d6fc3d54 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -832,7 +832,10 @@ async def wait_until_stopped(self) -> None: async def start_in_background(self) -> None: self._check_initialized() - await self._services_task_group.start(self.run_until_stopped) + await self._services_task_group.start( + self.run_until_stopped, + name=f"Scheduler {self.identity!r} main task", + ) async def run_until_stopped( self, *, task_status: TaskStatus = TASK_STATUS_IGNORED @@ -860,7 +863,10 @@ async def run_until_stopped( # Start periodic cleanups if self.cleanup_interval: - task_group.start_soon(self._cleanup_loop) + task_group.start_soon( + self._cleanup_loop, + name=f"Scheduler {self.identity!r} clean-up loop", + ) self.logger.debug( "Started internal cleanup loop with interval: %s", self.cleanup_interval, @@ -868,11 +874,17 @@ async def run_until_stopped( # Start processing due schedules, if configured to do so if self.role in (SchedulerRole.scheduler, SchedulerRole.both): - await task_group.start(self._process_schedules) + await task_group.start( + self._process_schedules, + name=f"Scheduler {self.identity!r} schedule processing loop", + ) # Start processing due jobs, if configured to do so if self.role in (SchedulerRole.worker, SchedulerRole.both): - await task_group.start(self._process_jobs) + await task_group.start( + self._process_jobs, + name=f"Scheduler {self.identity!r} job processing loop", + ) # Signal that the scheduler has started self._state = RunState.started @@ -937,7 +949,14 @@ async def extend_schedule_leases(schedules: Sequence[Schedule]) -> None: ) async with AsyncExitStack() as exit_stack: tg = await exit_stack.enter_async_context(create_task_group()) - tg.start_soon(extend_schedule_leases, schedules) + tg.start_soon( + extend_schedule_leases, + schedules, + name=( + f"Scheduler {self.identity!r} schedule lease extension " + f"loop" + ), + ) exit_stack.callback(tg.cancel_scope.cancel) now = datetime.now(timezone.utc) @@ -1101,8 +1120,11 @@ async def extend_job_leases() -> None: for job_executor in self.job_executors.values(): await job_executor.start(exit_stack) - outer_tg = await exit_stack.enter_async_context(create_task_group()) - outer_tg.start_soon(extend_job_leases) + task_group = await exit_stack.enter_async_context(create_task_group()) + task_group.start_soon( + extend_job_leases, + name=f"Scheduler {self.identity!r} job lease extension loop", + ) # Fetch new jobs every time exit_stack.enter_context( @@ -1121,18 +1143,20 @@ async def extend_job_leases() -> None: jobs = await self.data_store.acquire_jobs( self.identity, self.lease_duration, limit ) - async with AsyncExitStack() as inner_exit_stack: - inner_tg = await inner_exit_stack.enter_async_context( - create_task_group() + for job in jobs: + task = await self.data_store.get_task(job.task_id) + func = self._get_task_callable(task) + self._running_jobs.add(job) + task_group.start_soon( + self._run_job, + job, + func, + job.executor, + name=( + f"Scheduler {self.identity!r} job {job.id} " + f"({job.executor!r})" + ), ) - inner_exit_stack.callback(inner_tg.cancel_scope.cancel) - inner_tg.start_soon(extend_job_leases) - - for job in jobs: - task = await self.data_store.get_task(job.task_id) - func = self._get_task_callable(task) - self._running_jobs.add(job) - outer_tg.start_soon(self._run_job, job, func, job.executor) await wakeup_event.wait() wakeup_event = anyio.Event()