Skip to content

Commit

Permalink
Refactored data stores to better deal with schedule and job expiration
Browse files Browse the repository at this point in the history
Fixes #864.
  • Loading branch information
agronholm committed Jul 14, 2024
1 parent 43acae0 commit f9703ef
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 87 deletions.
3 changes: 3 additions & 0 deletions docs/extending.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ several abstract methods:
* :meth:`~abc.DataStore.acquire_jobs`
* :meth:`~abc.DataStore.release_job`
* :meth:`~abc.DataStore.get_job_result`
* :meth:`~abc.DataStore.extend_acquired_schedule_leases`
* :meth:`~abc.DataStore.extend_acquired_job_leases`
* :meth:`~abc.DataStore.cleanup`

The :meth:`~abc.DataStore.start` method is where your implementation can perform any
initialization, including starting any background tasks. This method is called with two
Expand Down
11 changes: 5 additions & 6 deletions docs/userguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,12 @@ When **distributed** event brokers (that is, other than the default one) are bei
events other than the ones relating to the life cycles of schedulers and workers, will
be sent to all schedulers and workers connected to that event broker.

Clean-up of expired jobs and schedules
======================================
Clean-up of expired jobs, job results and schedules
===================================================

Expired job results and finished schedules are, by default, automatically cleaned up by
each running scheduler on 15 minute intervals (counting from the scheduler's start
time). This can be adjusted (or disabled entirely) through the ``cleanup_interval``
configuration option.
Each scheduler runs the data store's :meth:`~DataStore.cleanup` method periodically,
configurable via the ``cleanup_interval`` scheduler parameter. This ensures that the
data store doesn't get filled with unused data over time.

Deployment
==========
Expand Down
13 changes: 12 additions & 1 deletion docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,22 @@ APScheduler, see the :doc:`migration section <migration>`.

- **BREAKING** Refactored ``AsyncpgEventBroker`` to directly accept a connection string,
thus eliminating the need for the ``AsyncpgEventBroker.from_dsn()`` class method
- **BREAKING** Added the ``extend_acquired_schedule_leases()`` data store method to
prevent other schedulers from acquiring schedules already being processed by a
scheduler, if that's taking unexpectedly long for some reason
- **BREAKING** Added the ``extend_acquired_job_leases()`` data store method to prevent
jobs from being cleaned up as if they had been abandoned
(`#864 <https://github.com/agronholm/apscheduler/issues/864>`_)
- **BREAKING** Changed the ``cleanup()`` data store method to also be responsible for
releasing jobs whose leases have expired (so the schedulers responsible for them have
probably died)
- Added the ``psycopg`` event broker
- Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and
``MongoDBDataStore``
- Changed the ``lock_expiration_delay`` parameter of built-in data stores to accept a
``timedelta`` as well as ``int`` or ``float``
- Fixed serialization error with ``CronTrigger`` when pausing a schedule
(`#923 <https://github.com/agronholm/apscheduler/issues/923>`_)
(`#864 <https://github.com/agronholm/apscheduler/issues/864>`_)
- Fixed ``TypeError: object NoneType can't be used in 'await' expression`` at teardown
of ``SQLAlchemyDataStore`` when it was passed a URL that implicitly created a
synchronous engine
Expand Down
32 changes: 30 additions & 2 deletions src/apscheduler/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,39 @@ async def get_job_result(self, job_id: UUID) -> JobResult | None:
:return: the result, or ``None`` if the result was not found
"""

async def extend_acquired_schedule_leases(
self, scheduler_id: str, schedule_ids: set[str]
) -> None:
"""
Extend the leases of specified schedules acquired by the given scheduler.
:param scheduler_id: unique identifier of the scheduler
:param schedule_ids: the identifiers of the schedules the scheduler is currently
processing
"""

async def extend_acquired_job_leases(
self, scheduler_id: str, job_ids: set[UUID]
) -> None:
"""
Extend the leases of specified jobs acquired by the given scheduler.
:param scheduler_id: unique identifier of the scheduler
:param job_ids: the identifiers of the jobs the scheduler is running
"""

@abstractmethod
async def cleanup(self) -> None:
"""
Purge expired job results and finished schedules that have no running jobs
associated with them.
Perform clean-up operations on the data store.
This method must perform the following operations (in this order):
* Purge expired job results (where ``expires_at`` is less or equal to the
current time)
* Release jobs with expired leases with the ``cancelled`` outcome
* Purge finished schedules (where ``next_run_time`` is ``None``) that have no
running jobs associated with them
"""


Expand Down
4 changes: 3 additions & 1 deletion src/apscheduler/datastores/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

from contextlib import AsyncExitStack
from datetime import timedelta
from logging import Logger

import attrs

from .._converters import as_timedelta
from .._retry import RetryMixin
from ..abc import DataStore, EventBroker, Serializer
from ..serializers.pickle import PickleSerializer
Expand All @@ -19,7 +21,7 @@ class BaseDataStore(DataStore):
can keep a lock on a schedule or task
"""

lock_expiration_delay: float = 30
lock_expiration_delay: timedelta = attrs.field(converter=as_timedelta, default=30)
_event_broker: EventBroker = attrs.field(init=False)
_logger: Logger = attrs.field(init=False)

Expand Down
38 changes: 32 additions & 6 deletions src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from bisect import bisect_left, bisect_right, insort_right
from collections import defaultdict
from datetime import MAXYEAR, datetime, timedelta, timezone
from datetime import MAXYEAR, datetime, timezone
from functools import partial
from typing import Iterable
from uuid import UUID

import attrs

from .. import JobOutcome
from .._enums import ConflictPolicy
from .._events import (
JobAcquired,
Expand Down Expand Up @@ -213,15 +214,15 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
elif state.schedule.paused:
# The schedule is paused
continue
elif state.acquired_by is not None:
elif state.acquired_until is not None:
if state.acquired_by != scheduler_id and now <= state.acquired_until:
# The schedule has been acquired by another scheduler and the
# timeout has not expired yet
continue

schedules.append(state.schedule)
state.acquired_by = scheduler_id
state.acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
state.acquired_until = now + self.lock_expiration_delay
if len(schedules) == limit:
break

Expand Down Expand Up @@ -296,9 +297,7 @@ async def acquire_jobs(
# Mark the job as acquired by this worker
jobs.append(job_state.job)
job_state.acquired_by = scheduler_id
job_state.acquired_until = now + timedelta(
seconds=self.lock_expiration_delay
)
job_state.acquired_until = now + self.lock_expiration_delay

# Increment the number of running jobs for this task
task_state.running_jobs += 1
Expand Down Expand Up @@ -353,6 +352,25 @@ async def release_job(self, scheduler_id: str, job: Job, result: JobResult) -> N
async def get_job_result(self, job_id: UUID) -> JobResult | None:
return self._job_results.pop(job_id, None)

async def extend_acquired_schedule_leases(
self, scheduler_id: str, schedule_ids: set[str]
) -> None:
new_acquired_until = datetime.now(timezone.utc) + self.lock_expiration_delay
for schedule_state in self._schedules:
if (
schedule_state.acquired_by == scheduler_id
and schedule_state.schedule.id in schedule_ids
):
schedule_state.acquired_until = new_acquired_until

async def extend_acquired_job_leases(
self, scheduler_id: str, job_ids: set[UUID]
) -> None:
new_acquired_until = datetime.now(timezone.utc) + self.lock_expiration_delay
for job_state in self._jobs:
if job_state.acquired_by == scheduler_id and job_state.job.id in job_ids:
job_state.acquired_until = new_acquired_until

async def cleanup(self) -> None:
# Clean up expired job results
now = datetime.now(timezone.utc)
Expand All @@ -364,6 +382,14 @@ async def cleanup(self) -> None:
for job_id in expired_job_ids:
del self._job_results[job_id]

# Finish any jobs whose leases have expired
for job_state in self._jobs:
if job_state.acquired_until is not None and job_state.acquired_until < now:
result = JobResult.from_job(
job=job_state.job, outcome=JobOutcome.cancelled, finished_at=now
)
await self.release_job(job_state.acquired_by, job_state.job, result)

# Clean up finished schedules that have no running jobs
finished_schedule_ids = [
schedule_id
Expand Down
Loading

0 comments on commit f9703ef

Please sign in to comment.