Skip to content

Commit

Permalink
Added automatic lease extension for acquired schedules and jobs
Browse files Browse the repository at this point in the history
With this, other schedulers will never touch schedules and jobs belonging to an actively running scheduler.
  • Loading branch information
agronholm committed Jul 16, 2024
1 parent 5568fdc commit e91c8de
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 171 deletions.
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ APScheduler, see the :doc:`migration section <migration>`.
- **BREAKING** Refactored the ``release_schedules()`` data store method to take a
sequence of ``ScheduleResult`` instances instead of a sequence of schedules, to enable
the memory data store to handle schedule updates more efficiently
- **BREAKING** Replaced the data store ``lock_expiration_delay`` parameter with a new
scheduler-level parameter, ``lease_duration`` which is then used to call the various
data store methods
- Added the ``psycopg`` event broker
- Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and
``MongoDBDataStore``
Expand Down
226 changes: 132 additions & 94 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class AsyncScheduler:
:param default_job_executor: name of the default job executor
:param cleanup_interval: interval (as seconds or timedelta) between automatic
calls to :meth:`cleanup` – ``None`` to disable automatic clean-up
:param lease_duration: maximum amount of time (as seconds or timedelta) that
the scheduler can keep a lock on a schedule or task
:param logger: the logger instance used to log events from the scheduler, data store
and event broker
"""
Expand Down Expand Up @@ -127,14 +129,15 @@ class AsyncScheduler:
validator=optional(instance_of(timedelta)),
default=timedelta(minutes=15),
)
lease_duration: timedelta = attrs.field(converter=as_timedelta, default=30)
logger: Logger = attrs.field(kw_only=True, default=getLogger(__name__))

_state: RunState = attrs.field(init=False, default=RunState.stopped)
_services_task_group: TaskGroup | None = attrs.field(init=False, default=None)
_exit_stack: AsyncExitStack = attrs.field(init=False)
_services_initialized: bool = attrs.field(init=False, default=False)
_scheduler_cancel_scope: CancelScope | None = attrs.field(init=False, default=None)
_running_jobs: set[UUID] = attrs.field(init=False, factory=set)
_running_jobs: set[Job] = attrs.field(init=False, factory=set)
_task_callables: dict[str, Callable] = attrs.field(init=False, factory=dict)

def __attrs_post_init__(self) -> None:
Expand Down Expand Up @@ -845,6 +848,14 @@ async def schedule_added_or_modified(event: Event) -> None:
)
wakeup_event.set()

async def extend_schedule_leases(schedules: Sequence[Schedule]) -> None:
schedule_ids = {schedule.id for schedule in schedules}
while True:
await sleep(self.lease_duration.total_seconds() / 2)
await self.data_store.extend_acquired_schedule_leases(
self.identity, schedule_ids, self.lease_duration
)

subscription = self.event_broker.subscribe(
schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
)
Expand All @@ -854,93 +865,102 @@ async def schedule_added_or_modified(event: Event) -> None:
await self.get_next_event(SchedulerStarted)

while self._state is RunState.started:
schedules = await self.data_store.acquire_schedules(self.identity, 100)
now = datetime.now(timezone.utc)
results: list[ScheduleResult] = []
for schedule in schedules:
# Calculate a next fire time for the schedule, if possible
fire_times = [schedule.next_fire_time]
calculate_next = schedule.trigger.next
while True:
try:
fire_time = calculate_next()
except Exception:
self.logger.exception(
"Error computing next fire time for schedule %r of "
"task %r – removing schedule",
schedule.id,
schedule.task_id,
)
break

# Stop if the calculated fire time is in the future
if fire_time is None or fire_time > now:
next_fire_time = fire_time
break

# Only keep all the fire times if coalesce policy = "all"
if schedule.coalesce is CoalescePolicy.all:
fire_times.append(fire_time)
elif schedule.coalesce is CoalescePolicy.latest:
fire_times[0] = fire_time

# Add one or more jobs to the job queue
max_jitter = (
schedule.max_jitter.total_seconds()
if schedule.max_jitter
else 0
)
for i, fire_time in enumerate(fire_times):
# Calculate a jitter if max_jitter > 0
jitter = _zero_timedelta
if max_jitter:
if i + 1 < len(fire_times):
following_fire_time = fire_times[i + 1]
else:
following_fire_time = next_fire_time

if following_fire_time is not None:
# Jitter must never be so high that it would cause a
# fire time to equal or exceed the next fire time
max_jitter = min(
[
max_jitter,
(
following_fire_time
- fire_time
- _microsecond_delta
).total_seconds(),
]
schedules = await self.data_store.acquire_schedules(
self.identity, self.lease_duration, 100
)
async with AsyncExitStack() as exit_stack:
tg = await exit_stack.enter_async_context(create_task_group())
tg.start_soon(extend_schedule_leases, schedules)
exit_stack.callback(tg.cancel_scope.cancel)

now = datetime.now(timezone.utc)
results: list[ScheduleResult] = []
for schedule in schedules:
# Calculate a next fire time for the schedule, if possible
fire_times = [schedule.next_fire_time]
calculate_next = schedule.trigger.next
while True:
try:
fire_time = calculate_next()
except Exception:
self.logger.exception(
"Error computing next fire time for schedule %r of "
"task %r – removing schedule",
schedule.id,
schedule.task_id,
)

jitter = timedelta(seconds=random.uniform(0, max_jitter))
fire_time += jitter

if schedule.misfire_grace_time is None:
start_deadline = None
else:
start_deadline = fire_time + schedule.misfire_grace_time

job = Job(
task_id=schedule.task_id,
args=schedule.args,
kwargs=schedule.kwargs,
schedule_id=schedule.id,
scheduled_fire_time=fire_time,
jitter=jitter,
start_deadline=start_deadline,
break

# Stop if the calculated fire time is in the future
if fire_time is None or fire_time > now:
next_fire_time = fire_time
break

# Only keep all the fire times if coalesce policy = "all"
if schedule.coalesce is CoalescePolicy.all:
fire_times.append(fire_time)
elif schedule.coalesce is CoalescePolicy.latest:
fire_times[0] = fire_time

# Add one or more jobs to the job queue
max_jitter = (
schedule.max_jitter.total_seconds()
if schedule.max_jitter
else 0
)
await self.data_store.add_job(job)

results.append(
ScheduleResult(
schedule_id=schedule.id,
task_id=schedule.task_id,
trigger=schedule.trigger,
last_fire_time=fire_times[-1],
next_fire_time=next_fire_time,
for i, fire_time in enumerate(fire_times):
# Calculate a jitter if max_jitter > 0
jitter = _zero_timedelta
if max_jitter:
if i + 1 < len(fire_times):
following_fire_time = fire_times[i + 1]
else:
following_fire_time = next_fire_time

if following_fire_time is not None:
# Jitter must never be so high that it would cause a
# fire time to equal or exceed the next fire time
max_jitter = min(
[
max_jitter,
(
following_fire_time
- fire_time
- _microsecond_delta
).total_seconds(),
]
)

jitter = timedelta(
seconds=random.uniform(0, max_jitter)
)
fire_time += jitter

if schedule.misfire_grace_time is None:
start_deadline = None
else:
start_deadline = fire_time + schedule.misfire_grace_time

job = Job(
task_id=schedule.task_id,
args=schedule.args,
kwargs=schedule.kwargs,
schedule_id=schedule.id,
scheduled_fire_time=fire_time,
jitter=jitter,
start_deadline=start_deadline,
)
await self.data_store.add_job(job)

results.append(
ScheduleResult(
schedule_id=schedule.id,
task_id=schedule.task_id,
trigger=schedule.trigger,
last_fire_time=fire_times[-1],
next_fire_time=next_fire_time,
)
)
)

# Update the schedules (and release the scheduler's claim on them)
await self.data_store.release_schedules(self.identity, results)
Expand Down Expand Up @@ -998,12 +1018,21 @@ async def check_queue_capacity(event: Event) -> None:
if len(self._running_jobs) < self.max_concurrent_jobs:
wakeup_event.set()

async def extend_job_leases() -> None:
while self._state is RunState.started:
await sleep(self.lease_duration.total_seconds() / 2)
job_ids = {job.id for job in self._running_jobs}
await self.data_store.extend_acquired_job_leases(
self.identity, job_ids, self.lease_duration
)

async with AsyncExitStack() as exit_stack:
# Start the job executors
for job_executor in self.job_executors.values():
await job_executor.start(exit_stack)

task_group = await exit_stack.enter_async_context(create_task_group())
outer_tg = await exit_stack.enter_async_context(create_task_group())
outer_tg.start_soon(extend_job_leases)

# Fetch new jobs every time
exit_stack.enter_context(
Expand All @@ -1019,14 +1048,23 @@ async def check_queue_capacity(event: Event) -> None:
while self._state is RunState.started:
limit = self.max_concurrent_jobs - len(self._running_jobs)
if limit > 0:
jobs = await self.data_store.acquire_jobs(self.identity, limit)
for job in jobs:
task = await self.data_store.get_task(job.task_id)
func = self._get_task_callable(task)
self._running_jobs.add(job.id)
task_group.start_soon(
self._run_job, job, func, task.job_executor
jobs = await self.data_store.acquire_jobs(
self.identity, self.lease_duration, limit
)
async with AsyncExitStack() as inner_exit_stack:
inner_tg = await inner_exit_stack.enter_async_context(
create_task_group()
)
inner_exit_stack.callback(inner_tg.cancel_scope.cancel)
inner_tg.start_soon(extend_job_leases)

for job in jobs:
task = await self.data_store.get_task(job.task_id)
func = self._get_task_callable(task)
self._running_jobs.add(job)
outer_tg.start_soon(
self._run_job, job, func, task.job_executor
)

await wakeup_event.wait()
wakeup_event = anyio.Event()
Expand Down Expand Up @@ -1086,4 +1124,4 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None:
finally:
current_job.reset(token)
finally:
self._running_jobs.remove(job.id)
self._running_jobs.remove(job)
2 changes: 2 additions & 0 deletions src/apscheduler/_schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
role: SchedulerRole = SchedulerRole.both,
max_concurrent_jobs: int = 100,
cleanup_interval: float | timedelta | None = None,
lease_duration: timedelta = timedelta(seconds=30),
job_executors: MutableMapping[str, JobExecutor] | None = None,
default_job_executor: str | None = None,
logger: Logger | None = None,
Expand All @@ -70,6 +71,7 @@ def __init__(
max_concurrent_jobs=max_concurrent_jobs,
job_executors=job_executors or {},
cleanup_interval=cleanup_interval,
lease_duration=lease_duration,
default_job_executor=default_job_executor,
logger=logger or logging.getLogger(__name__),
**kwargs,
Expand Down
18 changes: 13 additions & 5 deletions src/apscheduler/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from abc import ABCMeta, abstractmethod
from collections.abc import Sequence
from contextlib import AsyncExitStack
from datetime import datetime
from datetime import datetime, timedelta
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator
from uuid import UUID
Expand Down Expand Up @@ -247,14 +247,18 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:
"""

@abstractmethod
async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
async def acquire_schedules(
self, scheduler_id: str, lease_duration: timedelta, limit: int
) -> list[Schedule]:
"""
Acquire unclaimed due schedules for processing.
This method claims up to the requested number of schedules for the given
scheduler and returns them.
:param scheduler_id: unique identifier of the scheduler
:param lease_duration: the duration of the lease, after which the schedules can be
acquired by another scheduler even if ``acquired_by`` is not ``None``
:param limit: maximum number of schedules to claim
:return: the list of claimed schedules
"""
Expand Down Expand Up @@ -306,7 +310,7 @@ async def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:

@abstractmethod
async def acquire_jobs(
self, scheduler_id: str, limit: int | None = None
self, scheduler_id: str, lease_duration: timedelta, limit: int | None = None
) -> list[Job]:
"""
Acquire unclaimed jobs for execution.
Expand All @@ -315,6 +319,8 @@ async def acquire_jobs(
and returns them.
:param scheduler_id: unique identifier of the scheduler
:param lease_duration: the duration of the lease, after which the jobs will be
considered to be dead if the scheduler doesn't extend the lease duration
:param limit: maximum number of jobs to claim and return
:return: the list of claimed jobs
"""
Expand All @@ -341,24 +347,26 @@ async def get_job_result(self, job_id: UUID) -> JobResult | None:
"""

async def extend_acquired_schedule_leases(
self, scheduler_id: str, schedule_ids: set[str]
self, scheduler_id: str, schedule_ids: set[str], duration: timedelta
) -> None:
"""
Extend the leases of specified schedules acquired by the given scheduler.
:param scheduler_id: unique identifier of the scheduler
:param schedule_ids: the identifiers of the schedules the scheduler is currently
processing
:param duration: the duration by which to extend the leases
"""

async def extend_acquired_job_leases(
self, scheduler_id: str, job_ids: set[UUID]
self, scheduler_id: str, job_ids: set[UUID], duration: timedelta
) -> None:
"""
Extend the leases of specified jobs acquired by the given scheduler.
:param scheduler_id: unique identifier of the scheduler
:param job_ids: the identifiers of the jobs the scheduler is running
:param duration: the duration by which to extend the leases
"""

@abstractmethod
Expand Down
Loading

0 comments on commit e91c8de

Please sign in to comment.