diff --git a/docs/userguide.rst b/docs/userguide.rst index 4b0348cc2..84fe1d63a 100644 --- a/docs/userguide.rst +++ b/docs/userguide.rst @@ -332,6 +332,31 @@ the schedule you want to remove as an argument. This is the ID you got from Note that removing a schedule does not cancel any jobs derived from it, but does prevent further jobs from being created from that schedule. +Pausing schedules +----------------- + +To pause a schedule, call :meth:`~Scheduler.pause_schedule`. Pass the identifier of the +schedule you want to pause as an argument. This is the ID you got from +:meth:`~Scheduler.add_schedule`. + +Pausing a schedule prevents any new jobs from being created from it, but does not cancel +any jobs that have already been created from that schedule. + +The schedule can be unpaused by calling :meth:`~Scheduler.unpause_schedule` with the +identifier of the schedule you want to unpause. + +By default the schedule will retain the next fire time it had when it was paused, which +may result in the schedule being considered to have misfired when it is unpaused, +resulting in whatever misfire behavior it has configured +(see :ref:`controlling-how-much-a-job-can-be-started-late` for more details). + +The ``resume_from`` parameter can be used to specify the time from which the schedule +should be resumed. This can be used to avoid the misfire behavior mentioned above. It +can be either a datetime object, or the string ``"now"`` as a convenient shorthand for +the current datetime. If this parameter is provided, the schedules trigger will be +repeatedly advanced to determine a next fire time that is at or after the specified time +to resume from. + Limiting the number of concurrently executing instances of a job ---------------------------------------------------------------- @@ -344,6 +369,8 @@ still running, the later job is terminated with the outcome of To allow more jobs to be concurrently running for a task, pass the desired maximum number as the ``max_running_jobs`` keyword argument to :meth:`~Scheduler.add_schedule`. +.. _controlling-how-much-a-job-can-be-started-late: + Controlling how much a job can be started late ---------------------------------------------- diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 22bf8efc0..9b13bb515 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -15,6 +15,7 @@ APScheduler, see the :doc:`migration section `. - **BREAKING** Made publishing ``JobReleased`` events the responsibility of the ``DataStore`` implementation, rather than the scheduler, for consistency with the ``acquire_jobs()`` method +- Added the ability to pause and unpause schedules (PR by @WillDaSilva) - Fixed large parts of ``MongoDBDataStore`` still calling blocking functions in the event loop thread - Fixed JSON serialization of triggers that had been used at least once diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 4a1f9a39c..22c64654a 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -11,7 +11,7 @@ from inspect import isbuiltin, isclass, ismethod, ismodule from logging import Logger, getLogger from types import TracebackType -from typing import Any, Callable, Iterable, Mapping, cast, overload +from typing import Any, Callable, Iterable, Literal, Mapping, cast, overload from uuid import UUID, uuid4 import anyio @@ -410,6 +410,7 @@ async def add_schedule( id: str | None = None, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + paused: bool = False, job_executor: str | UnsetValue = unset, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None | UnsetValue = unset, @@ -427,6 +428,7 @@ async def add_schedule( based ID will be assigned) :param args: positional arguments to be passed to the task function :param kwargs: keyword arguments to be passed to the task function + :param paused: whether the schedule is paused :param job_executor: name of the job executor to run the task with :param coalesce: determines what to do when processing the schedule if multiple fire times have become due for this schedule since the last processing @@ -478,6 +480,7 @@ async def add_schedule( trigger=trigger, args=args, kwargs=kwargs, + paused=paused, coalesce=coalesce, misfire_grace_time=task.misfire_grace_time if misfire_grace_time is unset @@ -529,6 +532,58 @@ async def remove_schedule(self, id: str) -> None: self._check_initialized() await self.data_store.remove_schedules({id}) + async def pause_schedule(self, id: str) -> None: + """Pause the specified schedule.""" + self._check_initialized() + await self.data_store.add_schedule( + schedule=attrs.evolve(await self.get_schedule(id), paused=True), + conflict_policy=ConflictPolicy.replace, + ) + + async def unpause_schedule( + self, + id: str, + *, + resume_from: datetime | Literal["now"] | None = None, + ) -> None: + """ + Unpause the specified schedule. + + + :param resume_from: the time to resume the schedules from, or ``'now'`` as a + shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the + schedule left off which may cause it to misfire + + """ + self._check_initialized() + schedule = await self.get_schedule(id) + + if resume_from == "now": + resume_from = datetime.now(tz=timezone.utc) + + if resume_from is None: + next_fire_time = schedule.next_fire_time + elif ( + schedule.next_fire_time is not None + and schedule.next_fire_time >= resume_from + ): + next_fire_time = schedule.next_fire_time + else: + # Advance `next_fire_time` until its at or past `resume_from`, or until it's + # exhausted + while next_fire_time := schedule.trigger.next(): + if next_fire_time is None or next_fire_time >= resume_from: + break + + await self.data_store.add_schedule( + schedule=attrs.evolve( + schedule, + paused=False, + next_fire_time=next_fire_time, + ), + conflict_policy=ConflictPolicy.replace, + ) + async def add_job( self, func_or_task_id: TaskType, diff --git a/src/apscheduler/_schedulers/sync.py b/src/apscheduler/_schedulers/sync.py index 198205015..8630821e6 100644 --- a/src/apscheduler/_schedulers/sync.py +++ b/src/apscheduler/_schedulers/sync.py @@ -6,11 +6,11 @@ import threading from collections.abc import MutableMapping, Sequence from contextlib import ExitStack -from datetime import timedelta +from datetime import datetime, timedelta from functools import partial from logging import Logger from types import TracebackType -from typing import Any, Callable, Iterable, Mapping, overload +from typing import Any, Callable, Iterable, Literal, Mapping, overload from uuid import UUID from anyio.from_thread import BlockingPortal, start_blocking_portal @@ -238,6 +238,7 @@ def add_schedule( id: str | None = None, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + paused: bool = False, job_executor: str | UnsetValue = unset, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None | UnsetValue = unset, @@ -254,6 +255,7 @@ def add_schedule( id=id, args=args, kwargs=kwargs, + paused=paused, job_executor=job_executor, coalesce=coalesce, misfire_grace_time=misfire_grace_time, @@ -275,6 +277,25 @@ def remove_schedule(self, id: str) -> None: self._ensure_services_ready() self._portal.call(self._async_scheduler.remove_schedule, id) + def pause_schedule(self, id: str) -> None: + self._ensure_services_ready() + self._portal.call(self._async_scheduler.pause_schedule, id) + + def unpause_schedule( + self, + id: str, + *, + resume_from: datetime | Literal["now"] | None = None, + ) -> None: + self._ensure_services_ready() + self._portal.call( + partial( + self._async_scheduler.unpause_schedule, + id, + resume_from=resume_from, + ) + ) + def add_job( self, func_or_task_id: TaskType, diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py index ed9023f2d..55e2d9a1c 100644 --- a/src/apscheduler/_structures.py +++ b/src/apscheduler/_structures.py @@ -74,6 +74,7 @@ class Schedule: :var str task_id: unique identifier of the task to be run on this schedule :var tuple args: positional arguments to pass to the task callable :var dict[str, Any] kwargs: keyword arguments to pass to the task callable + :var bool paused: whether the schedule is paused :var CoalescePolicy coalesce: determines what to do when processing the schedule if multiple fire times have become due for this schedule since the last processing :var ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the @@ -105,6 +106,7 @@ class Schedule: kwargs: dict[str, Any] = attrs.field( eq=False, order=False, converter=dict, default=() ) + paused: bool = attrs.field(eq=False, order=False, default=False) coalesce: CoalescePolicy = attrs.field( eq=False, order=False, diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 40477a9ca..49003b692 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -207,8 +207,12 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul schedules: list[Schedule] = [] for state in self._schedules: if state.next_fire_time is None or state.next_fire_time > now: - # The schedule is either paused or not yet due + # The schedule is either exhausted or not yet due. There will be no + # schedules that are due after this one, so we can stop here. break + elif state.schedule.paused: + # The schedule is paused + continue elif state.acquired_by is not None: if state.acquired_by != scheduler_id and now <= state.acquired_until: # The schedule has been acquired by another scheduler and the diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 57687f998..8009d6b28 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -366,9 +366,19 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul lambda: self._schedules.find( { "next_fire_time": {"$lte": now}, - "$or": [ - {"acquired_until": {"$exists": False}}, - {"acquired_until": {"$lt": now}}, + "$and": [ + { + "$or": [ + {"paused": {"$exists": False}}, + {"paused": False}, + ] + }, + { + "$or": [ + {"acquired_until": {"$exists": False}}, + {"acquired_until": {"$lt": now}}, + ] + }, ], }, session=session, diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index ddc82ea86..17f2ec132 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -17,6 +17,7 @@ from anyio import CancelScope, to_thread from sqlalchemy import ( BigInteger, + Boolean, Column, DateTime, Enum, @@ -31,6 +32,7 @@ Uuid, and_, bindparam, + false, or_, select, ) @@ -293,6 +295,7 @@ def get_table_definitions(self) -> MetaData: Column("trigger", LargeBinary), Column("args", LargeBinary), Column("kwargs", LargeBinary), + Column("paused", Boolean, nullable=False, server_default=literal(False)), Column("coalesce", Enum(CoalescePolicy), nullable=False), Column("misfire_grace_time", interval_type), Column("max_jitter", interval_type), @@ -600,6 +603,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul and_( self._t_schedules.c.next_fire_time.isnot(None), comparison, + self._t_schedules.c.paused == false(), or_( self._t_schedules.c.acquired_until.is_(None), self._t_schedules.c.acquired_until < now, @@ -752,7 +756,10 @@ async def get_next_schedule_run_time(self) -> datetime | None: statenent = ( select(*columns) - .where(self._t_schedules.c.next_fire_time.isnot(None)) + .where( + self._t_schedules.c.next_fire_time.isnot(None), + self._t_schedules.c.paused == false(), + ) .order_by(self._t_schedules.c.next_fire_time) .limit(1) ) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 34b7adfd3..38c3aaf98 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -192,10 +192,10 @@ async def test_configure_task(self, raw_datastore: DataStore) -> None: assert isinstance(event, TaskUpdated) assert event.task_id == "mytask" - async def test_add_remove_schedule( + async def test_add_pause_unpause_remove_schedule( self, raw_datastore: DataStore, timezone: ZoneInfo ) -> None: - send, receive = create_memory_object_stream[Event](3) + send, receive = create_memory_object_stream[Event](5) async with AsyncScheduler(data_store=raw_datastore) as scheduler: scheduler.subscribe(send.send) now = datetime.now(timezone) @@ -210,6 +210,16 @@ async def test_add_remove_schedule( assert schedules[0].id == "foo" assert schedules[0].task_id == f"{__name__}:dummy_async_job" + await scheduler.pause_schedule("foo") + schedule = await scheduler.get_schedule("foo") + assert schedule.paused + assert schedule.next_fire_time == now + + await scheduler.unpause_schedule("foo") + schedule = await scheduler.get_schedule("foo") + assert not schedule.paused + assert schedule.next_fire_time == now + await scheduler.remove_schedule(schedule_id) assert not await scheduler.get_schedules() @@ -224,6 +234,18 @@ async def test_add_remove_schedule( assert event.task_id == f"{__name__}:dummy_async_job" assert event.next_fire_time == now + event = await receive.receive() + assert isinstance(event, ScheduleUpdated) + assert event.schedule_id == "foo" + assert event.task_id == f"{__name__}:dummy_async_job" + assert event.next_fire_time == now + + event = await receive.receive() + assert isinstance(event, ScheduleUpdated) + assert event.schedule_id == "foo" + assert event.task_id == f"{__name__}:dummy_async_job" + assert event.next_fire_time == now + event = await receive.receive() assert isinstance(event, ScheduleRemoved) assert event.schedule_id == "foo"