Skip to content

Commit

Permalink
Restarting rq-scheduler reschedules all periodics (#4302)
Browse files Browse the repository at this point in the history
* add some logging to scheduler

* schedule jobs only if they are not already scheduled

* jobs scheduled with an interval over 24 hours were not repeated

* schedule version_check using standard scheduling

* clean up old jobs that are not part of the definition anymore

* add some tests

* add one more test to verify that reschedules are not done when not neccesary

* no need to check for func existence - all jobs have a func to run
  • Loading branch information
Omer Lachish authored Nov 11, 2019
1 parent f19d242 commit e0e94d7
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 18 deletions.
5 changes: 3 additions & 2 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from sqlalchemy.orm import configure_mappers

from redash import rq_redis_connection
from redash.schedule import rq_scheduler, schedule_periodic_jobs
from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions

manager = AppGroup(help="RQ management commands.")


@manager.command()
def scheduler():
schedule_periodic_jobs()
jobs = periodic_job_definitions()
schedule_periodic_jobs(jobs)
rq_scheduler.run()


Expand Down
55 changes: 39 additions & 16 deletions redash/schedule.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import absolute_import
import logging
import hashlib
import json
from datetime import datetime, timedelta
from functools import partial
from random import randint

from rq.job import Job
from rq_scheduler import Scheduler

from redash import settings, rq_redis_connection
Expand All @@ -18,22 +21,29 @@
queue_name="periodic",
interval=5)

def job_id(kwargs):
metadata = kwargs.copy()
metadata['func'] = metadata['func'].__name__

def schedule(**kwargs):
return hashlib.sha1(json.dumps(metadata, sort_keys=True).encode()).hexdigest()


def prep(kwargs):
interval = kwargs['interval']
if isinstance(interval, timedelta):
interval = interval.seconds
interval = int(interval.total_seconds())

kwargs['interval'] = interval
kwargs['result_ttl'] = kwargs.get('result_ttl', interval * 2)

rq_scheduler.schedule(scheduled_time=datetime.utcnow(), **kwargs)
return kwargs


def schedule_periodic_jobs():
for job in rq_scheduler.get_jobs():
job.delete()
def schedule(kwargs):
rq_scheduler.schedule(scheduled_time=datetime.utcnow(), id=job_id(kwargs), **kwargs)


def periodic_job_definitions():
jobs = [
{"func": refresh_queries, "interval": 30, "result_ttl": 600},
{"func": empty_schedules, "interval": timedelta(minutes=60)},
Expand All @@ -43,19 +53,32 @@ def schedule_periodic_jobs():
{"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)}
]

if settings.VERSION_CHECK:
jobs.append({"func": version_check, "interval": timedelta(days=1)})

if settings.QUERY_RESULTS_CLEANUP_ENABLED:
jobs.append({"func": cleanup_query_results, "interval": timedelta(minutes=5)})

if settings.VERSION_CHECK:
# We schedule the version check to run at a random time in order to spread the requests from all users evenly.
rq_scheduler.cron('{minute} {hour} * * *'.format(
minute=randint(0, 59),
hour=randint(0, 23)),
func=version_check)

# Add your own custom periodic jobs in your dynamic_settings module.
jobs.extend(settings.dynamic_settings.periodic_jobs() or [])

for job in jobs:
logger.info("Scheduling %s with interval %s.", job['func'].__name__, job.get('interval'))
schedule(**job)
return jobs


def schedule_periodic_jobs(jobs):
job_definitions = [prep(job) for job in jobs]

jobs_to_clean_up = Job.fetch_many(
set([job.id for job in rq_scheduler.get_jobs()]) - set([job_id(job) for job in job_definitions]),
rq_redis_connection)

jobs_to_schedule = [job for job in job_definitions if job_id(job) not in rq_scheduler]

for job in jobs_to_clean_up:
logger.info("Removing %s (%s) from schedule.", job.id, job.func_name)
rq_scheduler.cancel(job)
job.delete()

for job in jobs_to_schedule:
logger.info("Scheduling %s (%s) with interval %s.", job_id(job), job['func'].__name__, job.get('interval'))
schedule(job)
61 changes: 61 additions & 0 deletions tests/test_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from unittest import TestCase
from mock import patch, ANY

from redash.schedule import rq_scheduler, schedule_periodic_jobs

class TestSchedule(TestCase):
def setUp(self):
for job in rq_scheduler.get_jobs():
rq_scheduler.cancel(job)
job.delete()

def test_schedules_a_new_job(self):
def foo():
pass

schedule_periodic_jobs([{"func": foo, "interval": 60}])

jobs = [job for job in rq_scheduler.get_jobs()]

self.assertEqual(len(jobs), 1)
self.assertTrue(jobs[0].func_name.endswith('foo'))
self.assertEqual(jobs[0].meta['interval'], 60)

def test_doesnt_reschedule_an_existing_job(self):
def foo():
pass

schedule_periodic_jobs([{"func": foo, "interval": 60}])
with patch('redash.schedule.rq_scheduler.schedule') as schedule:
schedule_periodic_jobs([{"func": foo, "interval": 60}])
schedule.assert_not_called()


def test_reschedules_a_modified_job(self):
def foo():
pass

schedule_periodic_jobs([{"func": foo, "interval": 60}])
schedule_periodic_jobs([{"func": foo, "interval": 120}])

jobs = [job for job in rq_scheduler.get_jobs()]

self.assertEqual(len(jobs), 1)
self.assertTrue(jobs[0].func_name.endswith('foo'))
self.assertEqual(jobs[0].meta['interval'], 120)

def test_removes_jobs_that_are_no_longer_defined(self):
def foo():
pass

def bar():
pass

schedule_periodic_jobs([{"func": foo, "interval": 60}, {"func": bar, "interval": 90}])
schedule_periodic_jobs([{"func": foo, "interval": 60}])

jobs = [job for job in rq_scheduler.get_jobs()]

self.assertEqual(len(jobs), 1)
self.assertTrue(jobs[0].func_name.endswith('foo'))
self.assertEqual(jobs[0].meta['interval'], 60)

0 comments on commit e0e94d7

Please sign in to comment.