Skip to content

Commit

Permalink
Added scheduler methods to explicitly configure tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Nov 11, 2023
1 parent 3e642c2 commit 1ecb343
Show file tree
Hide file tree
Showing 21 changed files with 1,238 additions and 567 deletions.
10 changes: 10 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Exceptions
.. autoexception:: apscheduler.TaskLookupError
.. autoexception:: apscheduler.ScheduleLookupError
.. autoexception:: apscheduler.JobLookupError
.. autoexception:: apscheduler.CallableLookupError
.. autoexception:: apscheduler.JobResultNotReady
.. autoexception:: apscheduler.JobCancelled
.. autoexception:: apscheduler.JobDeadlineMissed
Expand All @@ -141,3 +142,12 @@ Support classes for retrying failures

.. autoclass:: apscheduler.RetrySettings
.. autoclass:: apscheduler.RetryMixin

Support classes for unset options
---------------------------------

.. data:: apscheduler.unset

Sentinel value for unset option values.

.. autoclass:: apscheduler.UnsetValue
144 changes: 100 additions & 44 deletions docs/userguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ schedule.

The *scheduler* is the user-facing interface of the system. When it's running, it does
two things concurrently. The first is processing *schedules*. From its *data store*,
it fetches *schedules* due to be run. For each such schedule, it then uses the
schedule's *trigger* to calculate run times up to the present. The scheduler then
it fetches `schedules <schedule>`_ due to be run. For each such schedule, it then uses
the schedule's trigger_ to calculate run times up to the present. The scheduler then
creates one or more jobs (controllable by configuration) based on these run times and
adds them to the data store.

The second role of the scheduler is running jobs. The scheduler asks the data store for
jobs, and then starts running those jobs. If the data store signals that it has new
jobs, the scheduler will try to acquire those jobs if it is capable of accommodating
more. When a scheduler completes a job, it will then also ask the data store for as many
more jobs as it can handle.
The second role of the scheduler is running `jobs <job>`_ The scheduler asks the
`data store`_ for jobs, and then starts running those jobs. If the data store signals
that it has new jobs, the scheduler will try to acquire those jobs if it is capable of
accommodating more. When a scheduler completes a job, it will then also ask the data
store for as many more jobs as it can handle.

By default, schedulers operate in both of these roles, but can be configured to only
process schedules or run jobs if deemed necessary. It may even be desirable to use the
Expand All @@ -55,46 +55,87 @@ Basic concepts / glossary
These are the basic components and concepts of APScheduler which will be referenced
later in this guide.

A *task* encapsulates a Python function and a number of configuration parameters. They
are often implicitly defined as a side effect of the user creating a new schedule
against a function, but can also be explicitly defined beforehand (**TODO**: implement
this!).
.. _callable:

A *trigger* contains the logic and state used to calculate when a scheduled task should
A *callable* is any object that returns ``True`` from :func:`callable`. These are:

* A free function (``def something(...): ...``)
* An instance method (``class Foo: ... def something(self, ...): ...``)
* A class method (``class Foo: ... @classmethod ... def something(cls, ...): ...``)
* A static method (``class Foo: ... @staticmethod ... def something(...): ...``)
* A lambda (``lambda a, b: a + b``)
* An instance of a class that contains a method named ``__call__``)

.. _task:

A *task* encapsulates a callable_ and a number of configuration parameters. They are
often implicitly defined as a side effect of the user creating a new schedule against a
callable_, but can also be :ref:`explicitly defined beforehand <configuring-tasks>`.

.. _trigger:

A trigger_ contains the logic and state used to calculate when a scheduled task_ should
be run.

A *schedule* combines a task with a trigger, plus a number of configuration parameters.
.. _schedule:

A *schedule* combines a task_ with a trigger_, plus a number of configuration
parameters.

A *job* is request for a task to be run. It can be created automatically from a schedule
when a scheduler processes it, or it can be directly created by the user if they
directly request a task to be run.
.. _job:

A *data store* is used to store *schedules* and *jobs*, and to keep track of tasks.
A *job* is request for a task_ to be run. It can be created automatically from a
schedule when a scheduler processes it, or it can be directly created by the user if
they directly request a task_ to be run.

A *job executor* runs the job, by calling the function associated with the job's task.
An executor could directly call the function, or do it in another thread, subprocess or
.. _data store:

A *data store* is used to store `schedules <schedule>`_ and `jobs <job>`_, and to keep
track of `tasks <task>`_.

.. _job executor:

A *job executor* runs the job_, by calling the function associated with the job's task.
An executor could directly call the callable_, or do it in another thread, subprocess or
even some external service.

.. _event broker:

An *event broker* delivers published events to all interested parties. It facilitates
the cooperation between schedulers and workers by notifying them of new or updated
schedules or jobs.
the cooperation between schedulers by notifying them of new or updated
`schedules <schedule>`_ and `jobs <job>`_.

.. _scheduler:

A *scheduler* is the main interface of this library. It houses both a `data store`_ and
an `event broker`_, plus one or more `job executors <job executor>`_. It contains
methods users can use to work with tasks, schedules and jobs. Behind the scenes, it also
processes due schedules, spawning jobs and updating the next run times. It also
processes available jobs, making the appropriate `job executors <job executor>`_ to run
them, and then sending back the results to the `data store`_.

Running the scheduler
=====================

The scheduler_ comes in two flavors: synchronous and asynchronous. The synchronous
scheduler actually runs an asynchronous scheduler behind the scenes in a dedicated
thread, so if your app runs on :mod:`asyncio` or Trio_, you should prefer the
asynchronous scheduler.

The scheduler can run either in the foreground, blocking on a call to
:meth:`~Scheduler.run_until_stopped`, or in the background
where it does its work while letting the rest of the program run.
:meth:`~Scheduler.run_until_stopped`, or in the background where it does its work while
letting the rest of the program run.

If the only intent of your program is to run scheduled tasks, then you should start the
scheduler with :meth:`~Scheduler.run_until_stopped`. But if you need to do other things
too, then you should call :meth:`~Scheduler.start_in_background` before running the rest
of the program.

The scheduler can be used as a context manager. This initializes the underlying data
store and event broker, allowing you to use the scheduler for manipulating tasks and
schedules prior to actually starting it. Exiting the context manager will shut down
the scheduler and its underlying services. This mode of operation is mandatory for the
In almost all cases, the scheduler should be used as a context manager. This initializes
the underlying `data store`_ and `event broker`_, allowing you to use the scheduler for
manipulating `tasks <task>`_, `schedules <schedule>`_ and jobs prior to starting the
processing of schedules and jobs. Exiting the context manager will shut down the
scheduler and its underlying services. This mode of operation is mandatory for the
asynchronous scheduler when running it in the background, but it is preferred for the
synchronous scheduler too.

Expand All @@ -114,9 +155,9 @@ of the scheduler before the process terminates.

from apscheduler import Scheduler

scheduler = Scheduler()
# Add schedules, configure tasks here
scheduler.run_until_stopped()
with Scheduler() as scheduler:
# Add schedules, configure tasks here
scheduler.run_until_stopped()

.. code-tab:: python Synchronous (background thread; preferred method)

Expand Down Expand Up @@ -160,22 +201,39 @@ of the scheduler before the process terminates.

asyncio.run(main())

.. _configuring-tasks:

Configuring tasks
=================

In order to add `schedules <schedule>`_ or `jobs <job>`_ to the `data store`_, you need
to have a task_ that defines which callable_ will be called when each job_ is run.

In most cases, you don't need to go through this step, and instead have a task_
implicitly created for you by the methods that add `schedules or jobs.

Explicitly configuring a task is generally only necessary in the following cases:

* You need to have more than one task with the same callable
* You need to set any of the task settings to non-default values
* You need to add schedules/jobs targeting lambdas, nested functions or instances of
unserializable classes

Scheduling tasks
================

To create a schedule for running a task, you need, at the minimum:

* A *callable* to be run
* A *trigger*
* A preconfigured task_, OR a callable_ to be run
* A trigger_

.. note:: Scheduling lambdas or nested functions is currently not possible. This will be
fixed before the final release.
If you've configured a task (as per the previous section), you can pass the task object
or its ID to :meth:`Scheduler.add_schedule`. As a shortcut, you can pass a callable_
instead, in which case a task will be automatically created for you if necessary.

The callable can be a function or method, lambda or even an instance of a class that
contains the ``__call__()`` method. With the default (memory based) data store, any
callable can be used as a task callable. Persistent data stores (more on those below)
place some restrictions on the kinds of callables can be used because they cannot store
the callable directly but instead need to be able to locate it with a *reference*.
If the callable you're trying to schedule is either a lambda or a nested function, then
you need to explicitly create a task beforehand, as it is not possible to create a
reference (``package.module:varname``) to these types of callables.

The trigger determines the scheduling logic for your schedule. In other words, it is
used to calculate the datetimes on which the task will be run. APScheduler comes with a
Expand Down Expand Up @@ -283,8 +341,7 @@ still running, the later job is terminated with the outcome of
:attr:`~JobOutcome.missed_start_deadline`.

To allow more jobs to be concurrently running for a task, pass the desired maximum
number as the ``max_running_jobs`` keyword argument to
:meth:`~Scheduler.add_schedule`.~
number as the ``max_running_jobs`` keyword argument to :meth:`~Scheduler.add_schedule`.

Controlling how much a job can be started late
----------------------------------------------
Expand Down Expand Up @@ -360,8 +417,8 @@ Schedulers have the ability to notify listeners when some event occurs in the sc
system. Examples of such events would be schedulers or workers starting up or shutting
down, or schedules or jobs being created or removed from the data store.

To listen to events, you need a callable that takes a single positional argument which
is the event object. Then, you need to decide which events you're interested in:
To listen to events, you need a callable_ that takes a single positional argument
which is the event object. Then, you need to decide which events you're interested in:

.. tabs::

Expand Down Expand Up @@ -408,7 +465,6 @@ When you need your schedules and jobs to survive the application shutting down,
to use a *persistent data store*. Such data stores do have additional considerations,
compared to the memory data store:

* The task callable cannot be a lambda or a nested function
* Task arguments must be *serializable*
* You must either trust the data store, or use an alternate *serializer*
* A *conflict policy* and an *explicit identifier* must be defined for schedules that
Expand Down
4 changes: 4 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ APScheduler, see the :doc:`migration section <migration>`.
- **BREAKING** Switched to using the timezone aware timestamp column type on Oracle
- **BREAKING** Fixed precision issue with interval columns on MySQL
- **BREAKING** Worked around datetime microsecond precision issue on MongoDB
- **BREAKING** Renamed the ``worker_id`` field to ``scheduler_id`` in the
``JobAcquired`` and ``JobReleased`` events
- Added the ``configure_task()`` and ``get_tasks()`` scheduler methods
- Fixed out of order delivery of events delivered using worker threads
- Fixed schedule processing not setting job start deadlines correctly

**4.0.0a3**

Expand Down
8 changes: 3 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ cbor = ["cbor2 >= 5.0"]
mongodb = ["pymongo >= 4"]
mqtt = ["paho-mqtt >= 1.5"]
redis = ["redis >= 4.4.0"]
sqlalchemy = [
"sqlalchemy >= 2.0.19",
"greenlet >= 3.0.0a1; python_version >= '3.12'",
]
sqlalchemy = ["sqlalchemy >= 2.0.19"]
test = [
"APScheduler[cbor,mongodb,mqtt,redis,sqlalchemy]",
"asyncpg >= 0.20; python_implementation == 'CPython'",
"aiosqlite >= 0.19",
"anyio[trio]",
"asyncmy >= 0.2.5; python_implementation == 'CPython'",
"coverage >= 7",
"freezegun",
Expand All @@ -66,7 +64,7 @@ test = [
"pytest-freezer",
"pytest-lazy-fixture",
"pytest-mock",
"trio",
"uwsgi; python_version < '3.12'",
]
doc = [
"sphinx",
Expand Down
2 changes: 1 addition & 1 deletion src/apscheduler/_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
def as_aware_datetime(value: datetime | str) -> datetime:
"""Convert the value from a string to a timezone aware datetime."""
if isinstance(value, str):
# fromisoformat() does not handle the "Z" suffix
# Before Python 3.11, fromisoformat() could not handle the "Z" suffix
if value.upper().endswith("Z"):
value = value[:-1] + "+00:00"

Expand Down
10 changes: 5 additions & 5 deletions src/apscheduler/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ class JobAcquired(SchedulerEvent):
Signals that a worker has acquired a job for processing.
:param job_id: the ID of the job that was acquired
:param worker_id: the ID of the worker that acquired the job
:param scheduler_id: the ID of the scheduler that acquired the job
"""

job_id: UUID = attrs.field(converter=as_uuid)
worker_id: str
scheduler_id: str


@attrs.define(kw_only=True, frozen=True)
Expand All @@ -223,7 +223,7 @@ class JobReleased(SchedulerEvent):
Signals that a worker has finished processing of a job.
:param uuid.UUID job_id: the ID of the job that was released
:param worker_id: the ID of the worker that released the job
:param scheduler_id: the ID of the worker that released the job
:param outcome: the outcome of the job
:param exception_type: the fully qualified name of the exception if ``outcome`` is
:attr:`JobOutcome.error`
Expand All @@ -234,7 +234,7 @@ class JobReleased(SchedulerEvent):
"""

job_id: UUID = attrs.field(converter=as_uuid)
worker_id: str
scheduler_id: str
outcome: JobOutcome = attrs.field(converter=as_enum(JobOutcome))
exception_type: str | None = None
exception_message: str | None = None
Expand All @@ -253,7 +253,7 @@ def from_result(cls, result: JobResult, worker_id: str) -> JobReleased:

return cls(
job_id=result.job_id,
worker_id=worker_id,
scheduler_id=worker_id,
outcome=result.outcome,
exception_type=exception_type,
exception_message=exception_message,
Expand Down
4 changes: 4 additions & 0 deletions src/apscheduler/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def __init__(self, job_id: UUID):
super().__init__(f"No job by the id of {job_id} was found")


class CallableLookupError(LookupError):
"""Raised when the target callable for a job could not be found."""


class JobResultNotReady(Exception):
"""
Raised by :meth:`~Scheduler.get_job_result` if the job result is
Expand Down
Loading

0 comments on commit 1ecb343

Please sign in to comment.