Skip to content

Commit

Permalink
Added schedule option for specifying job result expiration time
Browse files Browse the repository at this point in the history
Closes #927.
  • Loading branch information
agronholm committed Jul 16, 2024
1 parent b702243 commit 82aa204
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 2 deletions.
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ APScheduler, see the :doc:`migration section <migration>`.
- **BREAKING** Replaced the data store ``lock_expiration_delay`` parameter with a new
scheduler-level parameter, ``lease_duration`` which is then used to call the various
data store methods
- **BREAKING** Added the ``job_result_expiration_time`` field to the ``Schedule`` class,
to allow the job results from scheduled jobs to stay around for some time
(`#927 <https://github.com/agronholm/apscheduler/issues/927>`_)
- Added the ``psycopg`` event broker
- Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and
``MongoDBDataStore``
Expand Down
11 changes: 9 additions & 2 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ async def add_schedule(
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
max_jitter: float | timedelta | None = None,
max_running_jobs: int | None | UnsetValue = unset,
job_result_expiration_time: float | timedelta = 0,
conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
) -> str:
"""
Expand All @@ -440,8 +441,10 @@ async def add_schedule(
fire times have become due for this schedule since the last processing
:param misfire_grace_time: maximum number of seconds the scheduled job's actual
run time is allowed to be late, compared to the scheduled run time
:param max_jitter: maximum number of seconds to randomly add to the scheduled
time for each job created from this schedule
:param max_jitter: maximum time (in seconds, or as a timedelta) to randomly add
to the scheduled time for each job created from this schedule
:param job_result_expiration_time: minimum time (in seconds, or as a timedelta)
to keep the job results in storage from the jobs created by this schedule
:param max_running_jobs: maximum number of instances of the task that are
allowed to run concurrently (if not set, uses the default misfire grace time
from the associated task, or ``None`` if there is no existing task)
Expand Down Expand Up @@ -492,6 +495,7 @@ async def add_schedule(
if misfire_grace_time is unset
else misfire_grace_time,
max_jitter=max_jitter,
job_result_expiration_time=job_result_expiration_time,
)
schedule.next_fire_time = trigger.next()
await self.data_store.add_schedule(schedule, conflict_policy)
Expand Down Expand Up @@ -683,6 +687,8 @@ def listener(event: JobReleased) -> None:
else:
raise JobLookupError(job_id)

# TODO: this might return None if the job didn't save its result, so deal with
# that
return await self.data_store.get_job_result(job_id)

async def run_job(
Expand Down Expand Up @@ -950,6 +956,7 @@ async def extend_schedule_leases(schedules: Sequence[Schedule]) -> None:
scheduled_fire_time=fire_time,
jitter=jitter,
start_deadline=start_deadline,
result_expiration_time=schedule.job_result_expiration_time,
)
await self.data_store.add_job(job)

Expand Down
8 changes: 8 additions & 0 deletions src/apscheduler/_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class Schedule:
run time
:var ~datetime.timedelta | None max_jitter: maximum number of seconds to randomly
add to the scheduled time for each job created from this schedule
:var ~datetime.timedelta job_result_expiration_time: minimum time to keep the job
results in storage from the jobs created by this schedule
:var ~datetime.datetime next_fire_time: the next time the task will be run
:var ~datetime.datetime | None last_fire_time: the last time the task was scheduled
to run
Expand Down Expand Up @@ -135,6 +137,12 @@ class Schedule:
validator=optional(instance_of(timedelta)),
on_setattr=frozen,
)
job_result_expiration_time: timedelta = attrs.field(
default=0,
converter=as_timedelta,
validator=optional(instance_of(timedelta)),
on_setattr=frozen,
)
next_fire_time: datetime | None = attrs.field(
converter=as_aware_datetime,
default=None,
Expand Down
1 change: 1 addition & 0 deletions src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def get_table_definitions(self) -> MetaData:
Column("coalesce", Enum(CoalescePolicy, metadata=metadata), nullable=False),
Column("misfire_grace_time", interval_type),
Column("max_jitter", interval_type),
Column("job_result_expiration_time", interval_type),
*next_fire_time_tzoffset_columns,
Column("last_fire_time", timestamp_type),
Column("acquired_by", Unicode(500), index=True),
Expand Down
28 changes: 28 additions & 0 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,34 @@ async def test_pause_unpause_schedule(
assert isinstance(job_added_event, JobAdded)
assert job_added_event.schedule_id == schedule_id

async def test_schedule_job_result_expiration_time(
self, raw_datastore: DataStore, timezone: ZoneInfo
) -> None:
trigger = DateTrigger(datetime.now(timezone))
send, receive = create_memory_object_stream[Event](4)
with send, receive:
async with AsyncExitStack() as exit_stack:
scheduler = await exit_stack.enter_async_context(
AsyncScheduler(data_store=raw_datastore)
)
await scheduler.add_schedule(
dummy_async_job, trigger, id="foo", job_result_expiration_time=10
)
exit_stack.enter_context(scheduler.subscribe(send.send, {JobAdded}))
await scheduler.start_in_background()

# Wait for the scheduled job to be added
with fail_after(3):
event = await receive.receive()
assert isinstance(event, JobAdded)
assert event.schedule_id == "foo"

# Get its result
result = await scheduler.get_job_result(event.job_id)

assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"


class TestSyncScheduler:
def test_configure(self) -> None:
Expand Down

0 comments on commit 82aa204

Please sign in to comment.