Skip to content

Commit

Permalink
Release v0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jcass77 committed Jun 17, 2021
2 parents 94dae80 + 850cbd0 commit eb0310f
Show file tree
Hide file tree
Showing 15 changed files with 559 additions and 65 deletions.
96 changes: 80 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,31 @@ So for now your options are to either:

Features of this package include:

- A custom `DjangoJobStore`: an [APScheduler job store](https://apscheduler.readthedocs.io/en/latest/extending.html#custom-job-stores)
- A custom `DjangoJobStore`:
an [APScheduler job store](https://apscheduler.readthedocs.io/en/latest/extending.html#custom-job-stores)
that persists scheduled jobs to the Django database. You can view the scheduled jobs and monitor the job execution
directly via the Django admin interface:
![Jobs](docs/screenshots/job_overview.png)

![Jobs](https://raw.githubusercontent.com/jcass77/django-apscheduler/main/docs/screenshots/job_overview.png)

- The job store also maintains a history of all job executions of the currently scheduled jobs, along with status codes
and exceptions (if any):
![Jobs](docs/screenshots/execution_overview.png)

![Jobs](https://raw.githubusercontent.com/jcass77/django-apscheduler/main/docs/screenshots/execution_overview.png)

- **Note:** APScheduler will [automatically remove jobs](https://apscheduler.readthedocs.io/en/latest/userguide.html#removing-jobs)
from the job store as soon as their last scheduled execution has been triggered. This will also delete the
corresponding job execution entries from the database (i.e. job execution logs are only maintained for 'active' jobs.)

- Job executions can also be triggered manually via the `DjangoJob` admin page:

![Jobs](docs/screenshots/run_now.png)
![Jobs](https://raw.githubusercontent.com/jcass77/django-apscheduler/main/docs/screenshots/run_now.png)

- **Note:** In order to prevent long running jobs from causing the Django HTTP request to time out, the combined maximum
run time for all APScheduler jobs that are started via the Django admin site is 25 seconds. This timeout value can be
configured via the `APSCHEDULER_RUN_NOW_TIMEOUT` setting.


Installation
------------

Expand Down Expand Up @@ -123,27 +125,37 @@ from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution

from django_apscheduler import util

logger = logging.getLogger(__name__)


def my_job():
# Your job processing logic here...
# Your job processing logic here...
pass


# The `close_old_connections` decorator ensures that database connections, that have become unusable or are obsolete,
# are closed before and after our job has run.
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):
"""This job deletes all apscheduler job executions older than `max_age` from the database."""
"""
This job deletes APScheduler job execution entries older than `max_age` from the database. It helps to prevent the
database from filling up with old historical records that are no longer useful.
:param max_age: The maximum length of time to retain historical job execution records. Defaults
to 7 days.
"""
DjangoJobExecution.objects.delete_old_job_executions(max_age)


class Command(BaseCommand):
help = "Runs apscheduler."
help = "Runs APScheduler."

def handle(self, *args, **options):
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")

scheduler.add_job(
my_job,
trigger=CronTrigger(second="*/10"), # Every 10 seconds
Expand Down Expand Up @@ -188,16 +200,68 @@ Advanced Usage
--------------

django-apscheduler assumes that you are already familiar with APScheduler and its proper use. If not, then please head
over to the project page and have a look through the [APScheduler documentation](https://apscheduler.readthedocs.io/en/latest/index.html).
over to the project page and have a look through
the [APScheduler documentation](https://apscheduler.readthedocs.io/en/latest/index.html).

It is possible to make use of [different types of schedulers](https://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s)
It is possible to make use
of [different types of schedulers](https://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s)
depending on your environment and use case. If you would prefer running a `BackgroundScheduler` instead of using a
`BlockingScheduler`, then you should be aware that using APScheduler with uWSGI requires some additional
[configuration steps](https://apscheduler.readthedocs.io/en/latest/faq.html#how-can-i-use-apscheduler-with-uwsgi) in
order to re-enable threading support.


## Project resources


Supported Databases
-------------------

Please take note of the list of databases that
are [officially supported by Django](https://docs.djangoproject.com/en/dev/ref/databases/#databases). django-apscheduler
probably won't work with unsupported databases like Microsoft SQL Server, MongoDB, and the like.


Database Connections and Timeouts
---------------------------------

django-apscheduler is dependent on the standard Django
database [configuration settings](https://docs.djangoproject.com/en/dev/ref/databases/#general-notes). These settings,
in combination with how your database server has been configured, determine how connection management will be performed
for your specific deployment.

The `close_old_connections` decorator should be applied to APScheduler jobs that require database access. Doing so
ensures that Django's [CONN_MAX_AGE](https://docs.djangoproject.com/en/dev/ref/settings/#std:setting-CONN_MAX_AGE)
configuration setting is enforced before and after your job is run. This mirrors the standard Django functionality of
doing the same before and after handling each HTTP request.

If you still encounter any kind of 'lost database connection' errors then it probably means that:

- Your database connections timed out in the middle of executing a job. You should probably consider incorporating a
connection pooler as part of your deployment for more robust database connection management
(e.g. [pgbouncer](https://www.pgbouncer.org) for PostgreSQL, or the equivalent for other DB platforms).
- Your database server has crashed / been restarted.
Django [will not reconnect automatically](https://code.djangoproject.com/ticket/24810)
and you need to re-start django-apscheduler as well.

Common footguns
---------------

Unless you have a very specific set of requirements, and have intimate knowledge of the inner workings of APScheduler,
you shouldn't be using `BackgroundScheduler`. This can lead to all sorts of temptations like:

* Firing up a scheduler inside of a Django view. This will most likely cause more than one scheduler to run concurrently
and lead to jobs running multiple times (see the above introduction to this README for a more thorough treatment of
the subject).
* Bootstrapping a scheduler somewhere else inside of your Django application. It feels like this should solve the
problem mentioned above and guarantee that only one scheduler is running. The downside is that you have just delegated
all of your background task processing to whatever webserver you are using (Gunicorn, uWSGI, etc.). It will probably
kill any long-running threads (your jobs) with extreme prejudice (thinking that they are caused by misbehaving HTTP
requests).

Relying on `BlockingScheduler` forces you to run APScheduler in its own dedicated process that is not handled or
monitored by the webserver. The example code provided in `runapscheduler.py` above is a good starting point.


Project resources
-----------------

- [Changelog](docs/changelog.md)
- [Release procedures](docs/releasing.md)
49 changes: 26 additions & 23 deletions django_apscheduler/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django.utils import timezone
from django.utils.html import format_html
from django.utils.safestring import mark_safe
from django.utils.translation import gettext as _
from django.utils.translation import gettext_lazy as _

from django_apscheduler.models import DjangoJob, DjangoJobExecution
from django_apscheduler import util
Expand All @@ -27,7 +27,8 @@ def __init__(self, model, admin_site):
self._django_jobstore = DjangoJobStore()
self._memory_jobstore = DjangoMemoryJobStore()

self._jobs_executed = []
self._jobs_scheduled = None
self._jobs_executed = None
self._job_execution_timeout = getattr(
settings, "APSCHEDULER_RUN_NOW_TIMEOUT", 15
)
Expand Down Expand Up @@ -58,7 +59,7 @@ def average_duration(self, obj):
except DjangoJobExecution.DoesNotExist:
return "None"

average_duration.short_description = "Average Duration (sec)"
average_duration.short_description = _("Average Duration (sec)")

actions = ["run_selected_jobs"]

Expand All @@ -69,21 +70,16 @@ def run_selected_jobs(self, request, queryset):

scheduler.start()

num_jobs_scheduled = 0
self._jobs_executed = []
self._jobs_scheduled = set()
self._jobs_executed = set()
start_time = timezone.now()

for item in queryset:
django_job = self._django_jobstore.lookup_job(item.id)

if not django_job:
msg_dict = {"job_id": item.id}
msg = _(
"Could not find job {job_id} in the database! Skipping execution..."
)
self.message_user(
request, format_html(msg, **msg_dict), messages.WARNING
)
msg = _("Could not find job {} in the database! Skipping execution...")
self.message_user(request, format_html(msg, item.id), messages.WARNING)
continue

scheduler.add_job(
Expand All @@ -98,35 +94,42 @@ def run_selected_jobs(self, request, queryset):
max_instances=django_job.max_instances,
)

num_jobs_scheduled += 1
self._jobs_scheduled.add(django_job.id)

while len(self._jobs_executed) < num_jobs_scheduled:
while self._jobs_scheduled != self._jobs_executed:
# Wait for selected jobs to be executed.
if timezone.now() > start_time + timedelta(
seconds=self._job_execution_timeout
):
msg = _(
"Maximum runtime exceeded! Not all jobs could be completed successfully."
"Maximum runtime of {} seconds exceeded! Not all jobs could be completed successfully. "
"Pending jobs: {}"
)
self.message_user(
request,
format_html(
msg,
self._job_execution_timeout,
",".join(self._jobs_scheduled - self._jobs_executed),
),
messages.ERROR,
)
self.message_user(request, msg, messages.ERROR)

scheduler.shutdown(wait=False)
return None

time.sleep(0.1)

for job_id in self._jobs_executed:
msg_dict = {"job_id": job_id}
msg = _("Executed job '{job_id}'!")
self.message_user(request, format_html(msg, **msg_dict))
self.message_user(request, format_html(_("Executed job '{}'!"), job_id))

scheduler.shutdown()
return None

def _handle_execution_event(self, event: events.JobExecutionEvent):
self._jobs_executed.append(event.job_id)
self._jobs_executed.add(event.job_id)

run_selected_jobs.short_description = "Run the selected django jobs"
run_selected_jobs.short_description = _("Run the selected django jobs")


@admin.register(DjangoJobExecution)
Expand All @@ -153,5 +156,5 @@ def local_run_time(self, obj):
def duration_text(self, obj):
return obj.duration or "N/A"

html_status.short_description = "Status"
duration_text.short_description = "Duration (sec)"
html_status.short_description = _("Status")
duration_text.short_description = _("Duration (sec)")
11 changes: 10 additions & 1 deletion django_apscheduler/jobstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django import db
from django.db import transaction, IntegrityError

from django_apscheduler import util
from django_apscheduler.models import DjangoJob, DjangoJobExecution
from django_apscheduler.util import (
get_apscheduler_datetime,
Expand Down Expand Up @@ -181,7 +182,8 @@ class is used for each event code.
)

self._scheduler.add_listener(
self.handle_error_event, events.EVENT_JOB_ERROR | events.EVENT_JOB_MISSED,
self.handle_error_event,
events.EVENT_JOB_ERROR | events.EVENT_JOB_MISSED,
)


Expand All @@ -199,6 +201,7 @@ def __init__(self, pickle_protocol: int = pickle.HIGHEST_PROTOCOL):
super().__init__()
self.pickle_protocol = pickle_protocol

@util.retry_on_db_operational_error
def lookup_job(self, job_id: str) -> Union[None, AppSchedulerJob]:
try:
job_state = DjangoJob.objects.get(id=job_id).job_state
Expand All @@ -211,6 +214,7 @@ def get_due_jobs(self, now) -> List[AppSchedulerJob]:
dt = get_django_internal_datetime(now)
return self._get_jobs(next_run_time__lte=dt)

@util.retry_on_db_operational_error
def get_next_run_time(self):
try:
job = DjangoJob.objects.filter(next_run_time__isnull=False).earliest(
Expand All @@ -227,6 +231,7 @@ def get_all_jobs(self):

return jobs

@util.retry_on_db_operational_error
def add_job(self, job: AppSchedulerJob):
with transaction.atomic():
try:
Expand All @@ -238,6 +243,7 @@ def add_job(self, job: AppSchedulerJob):
except IntegrityError:
raise ConflictingIdError(job.id)

@util.retry_on_db_operational_error
def update_job(self, job: AppSchedulerJob):
# Acquire lock for update
with transaction.atomic():
Expand All @@ -254,12 +260,14 @@ def update_job(self, job: AppSchedulerJob):
except DjangoJob.DoesNotExist:
raise JobLookupError(job.id)

@util.retry_on_db_operational_error
def remove_job(self, job_id: str):
try:
DjangoJob.objects.get(id=job_id).delete()
except DjangoJob.DoesNotExist:
raise JobLookupError(job_id)

@util.retry_on_db_operational_error
def remove_all_jobs(self):
# Implicit: will also delete all DjangoJobExecutions due to on_delete=models.CASCADE
DjangoJob.objects.all().delete()
Expand All @@ -276,6 +284,7 @@ def _reconstitute_job(self, job_state):

return job

@util.retry_on_db_operational_error
def _get_jobs(self, **filters):
jobs = []
failed_job_ids = set()
Expand Down
8 changes: 6 additions & 2 deletions django_apscheduler/migrations/0001_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class Migration(migrations.Migration):
("next_run_time", models.DateTimeField(db_index=True)),
("job_state", models.BinaryField()),
],
options={"ordering": ("next_run_time",),},
options={
"ordering": ("next_run_time",),
},
),
migrations.CreateModel(
name="DjangoJobExecution",
Expand Down Expand Up @@ -88,6 +90,8 @@ class Migration(migrations.Migration):
),
),
],
options={"ordering": ("-run_time",),},
options={
"ordering": ("-run_time",),
},
),
]
5 changes: 4 additions & 1 deletion django_apscheduler/migrations/0006_remove_djangojob_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ class Migration(migrations.Migration):
]

operations = [
migrations.RemoveField(model_name="djangojob", name="name",),
migrations.RemoveField(
model_name="djangojob",
name="name",
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ class Migration(migrations.Migration):
]

operations = [
migrations.RemoveField(model_name="djangojobexecution", name="started",),
migrations.RemoveField(
model_name="djangojobexecution",
name="started",
),
]
Loading

0 comments on commit eb0310f

Please sign in to comment.