Skip to content

Commit

Permalink
(wip) recurrence rule support
Browse files Browse the repository at this point in the history
  • Loading branch information
concreted committed Jun 26, 2017
1 parent ccac3e1 commit cf3fd9e
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 3 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ docs/_build/

# PyBuilder
target/

# VirtualEnv
.venv

35 changes: 35 additions & 0 deletions redbeat/decoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# coding: utf-8

import time
from datetime import datetime

try:
Expand All @@ -8,6 +9,7 @@
import json

from celery.schedules import schedule, crontab
from .schedules import rrule


class RedBeatJSONDecoder(json.JSONDecoder):
Expand All @@ -29,6 +31,14 @@ def dict_to_object(self, d):
if objtype == 'crontab':
return crontab(**d)

if objtype == 'rrule':
rrule_dict = d
# Decode timestamp values into datetime objects
for key in ['dtstart', 'until']:
if rrule_dict[key] is not None:
rrule_dict[key] = datetime.fromtimestamp(rrule_dict[key])
return rrule(**rrule_dict)

d['__type__'] = objtype

return d
Expand Down Expand Up @@ -62,5 +72,30 @@ def default(self, obj):
'every': obj.run_every.total_seconds(),
'relative': bool(obj.relative),
}
if isinstance(obj, rrule):
# Convert datetime objects to timestamps
dtstart_ts = time.mktime(obj.dtstart.timetuple()) \
if obj.dtstart else None
until_ts = time.mktime(obj.until.timetuple()) \
if obj.until else None

return {
'__type__': 'rrule',
'freq': obj.freq,
'dtstart': dtstart_ts,
'interval': obj.interval,
'wkst': obj.wkst,
'count': obj.count,
'until': until_ts,
'bysetpos': obj.bysetpos,
'bymonth': obj.bymonth,
'bymonthday': obj.bymonthday,
'byyearday': obj.byyearday,
'byeaster': obj.byeaster,
'byweekno': obj.byweekno,
'byweekday': obj.byweekday,
'byhour': obj.byhour,
'byminute': obj.byminute,
'bysecond': obj.bysecond
}
return super(RedBeatJSONEncoder, self).default(obj)
15 changes: 12 additions & 3 deletions redbeat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ def due_at(self):
return self._default_now()

delta = self.schedule.remaining_estimate(self.last_run_at)
# if no delta, means no more events after the last_run_at.
if not delta:
return None

# overdue => due now
if delta.total_seconds() < 0:
Expand All @@ -189,6 +192,8 @@ def key(self):

@property
def score(self):
if not self.due_at:
return -1
return to_timestamp(self.due_at)

@property
Expand Down Expand Up @@ -321,18 +326,22 @@ def schedule(self):
client = redis(self.app)

with client.pipeline() as pipe:
pipe.zrangebyscore(self.app.redbeat_conf.schedule_key, 0, max_due_at)
pipe.zrangebyscore(self.app.redbeat_conf.schedule_key, -1, max_due_at, withscores=True)

# peek into the next tick to accuratly calculate sleep between ticks
pipe.zrangebyscore(self.app.redbeat_conf.schedule_key,
'({}'.format(max_due_at),
max_due_at + self.max_interval,
start=0, num=1)
start=-1, num=1, withscores=True)
due_tasks, maybe_due = pipe.execute()

logger.info('Loading %d tasks', len(due_tasks) + len(maybe_due))
d = {}
for key in due_tasks + maybe_due:
for key, score in due_tasks + maybe_due:
if score < 0:
logger.info('removing ended schedule %s', key)
client.zrem(self.app.redbeat_conf.schedule_key, key)
continue
try:
entry = self.Entry.from_key(key, app=self.app)
except KeyError:
Expand Down
74 changes: 74 additions & 0 deletions redbeat/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import celery
from dateutil.rrule import rrule as dateutil_rrule
from celery.schedules import BaseSchedule
from celery.utils.time import (
localize,
timezone
)


class rrule(BaseSchedule):
RRULE_REPR = """\
<rrule: {0._freq} {0._interval}>\
"""

def __init__(self, freq, dtstart=None,
interval=1, wkst=None, count=None, until=None, bysetpos=None,
bymonth=None, bymonthday=None, byyearday=None, byeaster=None,
byweekno=None, byweekday=None,
byhour=None, byminute=None, bysecond=None,
**kwargs):
self.freq = freq
self.dtstart = dtstart
self.interval = interval
self.wkst = wkst
self.count = count
self.until = until
self.bysetpos = bysetpos
self.bymonth = bymonth
self.bymonthday = bymonthday
self.byyearday = byyearday
self.byeaster = byeaster
self.byweekno = byweekno
self.byweekday = byweekday
self.byhour = byhour
self.byminute = byminute
self.bysecond = bysecond
self.rrule = dateutil_rrule(freq, dtstart, interval, wkst, count, until,
bysetpos, bymonth, bymonthday, byyearday, byeaster,
byweekno, byweekday, byhour, byminute, bysecond)
super(rrule, self).__init__(**kwargs)

def remaining_estimate(self, last_run_at):
last_run_at = self.maybe_make_aware(last_run_at)
last_run_at_utc = localize(last_run_at, timezone.utc)
print last_run_at
print last_run_at_utc
last_run_at_utc_naive = last_run_at_utc.replace(tzinfo=None)
next_run_utc = self.rrule.after(last_run_at_utc_naive)
if next_run_utc:
next = self.maybe_make_aware(next_run_utc)
now = self.maybe_make_aware(self.now())
delta = next - now
return delta
return None

def is_due(self, last_run_at):
rem_delta = self.remaining_estimate(last_run_at)
if rem_delta:
rem = max(rem_delta.total_seconds(), 0)
due = rem == 0
if due:
rem_delta = self.remaining_estimate(self.now())
if rem_delta:
rem = max(rem_delta.total_seconds(), 0)
else:
rem = 0
return celery.schedules.schedstate(due, rem)
return celery.schedules.schedstate(False, None)

def __repr__(self):
return self.RRULE_REPR.format(self)

def __reduce__(self):
return (self.__class__, (self.rrule), None)
1 change: 1 addition & 0 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pytest-catchlog
pytest-cov
redis
toX
python-dateutil
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
install_requires=[
'redis',
'celery',
'python-dateutil'
],
tests_require=[
'pytest',
Expand Down

0 comments on commit cf3fd9e

Please sign in to comment.