diff --git a/src/croniter/croniter.py b/src/croniter/croniter.py index 38186b7..452143e 100644 --- a/src/croniter/croniter.py +++ b/src/croniter/croniter.py @@ -52,7 +52,7 @@ def is_32bit(): if bits == 32: is_32 = True - elif architecture and '32' in architecture: + elif architecture and "32" in architecture: is_32 = True elif is_small_maxsize: is_32 = True @@ -75,23 +75,23 @@ def is_32bit(): EPOCH = datetime.datetime.fromtimestamp(0) -M_ALPHAS = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, - 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12} -DOW_ALPHAS = {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6} +M_ALPHAS = {"jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6, + "jul": 7, "aug": 8, "sep": 9, "oct": 10, "nov": 11, "dec": 12} +DOW_ALPHAS = {"sun": 0, "mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6} -step_search_re = re.compile(r'^([^-]+)-([^-/]+)(/(\d+))?$') -only_int_re = re.compile(r'^\d+$') +step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$") +only_int_re = re.compile(r"^\d+$") -WEEKDAYS = '|'.join(DOW_ALPHAS.keys()) -MONTHS = '|'.join(M_ALPHAS.keys()) -star_or_int_re = re.compile(r'^(\d+|\*)$') +WEEKDAYS = "|".join(DOW_ALPHAS.keys()) +MONTHS = "|".join(M_ALPHAS.keys()) +star_or_int_re = re.compile(r"^(\d+|\*)$") special_dow_re = re.compile( - (r'^(?P
((?P(({WEEKDAYS})(-({WEEKDAYS}))?)').format(WEEKDAYS=WEEKDAYS) + - (r'|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P \d+)$').format(MONTHS=MONTHS) + (r"^(?P ((?P(({WEEKDAYS})(-({WEEKDAYS}))?)").format(WEEKDAYS=WEEKDAYS) + + (r"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P \d+)$").format(MONTHS=MONTHS) ) -re_star = re.compile('[*]') +re_star = re.compile("[*]") hash_expression_re = re.compile( - r'^(?P h|r)(\((?P \d+)-(?P \d+)\))?(\/(?P \d+))?$' + r"^(?P h|r)(\((?P \d+)-(?P \d+)\))?(\/(?P \d+))?$" ) MINUTE_FIELD = 0 HOUR_FIELD = 1 @@ -104,9 +104,9 @@ def is_32bit(): SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD) YEAR_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD, YEAR_FIELD) CRON_FIELDS = { - 'unix': UNIX_FIELDS, - 'second': SECOND_FIELDS, - 'year': YEAR_FIELDS, + "unix": UNIX_FIELDS, + "second": SECOND_FIELDS, + "year": YEAR_FIELDS, len(UNIX_FIELDS): UNIX_FIELDS, len(SECOND_FIELDS): SECOND_FIELDS, len(YEAR_FIELDS): YEAR_FIELDS, @@ -229,9 +229,9 @@ def __init__(self, expr_format, start_time=None, ret_type=float, if hash_id: if not isinstance(hash_id, (bytes, str)): - raise TypeError('hash_id must be bytes or UTF-8 string') + raise TypeError("hash_id must be bytes or UTF-8 string") if not isinstance(hash_id, bytes): - hash_id = hash_id.encode('UTF-8') + hash_id = hash_id.encode("UTF-8") self._max_years_btw_matches_explicitly_set = ( max_years_between_matches is not None) @@ -348,7 +348,7 @@ def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current # exception to support day of month and day of week as defined in cron dom_dow_exception_processed = False - if (expanded[DAY_FIELD][0] != '*' and expanded[DOW_FIELD][0] != '*') and self._day_or: + if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or: # If requested, handle a bug in vixie cron/ISC cron where day_of_month and day_of_week form # an intersection (AND) instead of a union (OR) if either field is an asterisk or starts with an asterisk # (https://crontab.guru/cron-bug.html) @@ -361,10 +361,10 @@ def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current pass else: bak = expanded[DOW_FIELD] - expanded[DOW_FIELD] = ['*'] + expanded[DOW_FIELD] = ["*"] t1 = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev) expanded[DOW_FIELD] = bak - expanded[DAY_FIELD] = ['*'] + expanded[DAY_FIELD] = ["*"] t2 = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev) if not is_prev: @@ -414,7 +414,7 @@ def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current return result # iterator protocol, to enable direct use of croniter - # objects in a loop, like "for dt in croniter('5 0 * * *'): ..." + # objects in a loop, like "for dt in croniter("5 0 * * *'): ..." # or for combining multiple croniters into single # dates feed using 'itertools' module def all_next(self, ret_type=None, start_time=None, update_current=None): @@ -499,7 +499,7 @@ def proc_year(d): def proc_month(d): try: - expanded[MONTH_FIELD].index('*') + expanded[MONTH_FIELD].index("*") except ValueError: diff_month = nearest_diff_method( d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR) @@ -521,12 +521,12 @@ def proc_month(d): def proc_day_of_month(d): try: - expanded[DAY_FIELD].index('*') + expanded[DAY_FIELD].index("*") except ValueError: days = DAYS[month - 1] if month == 2 and self.is_leap(year) is True: days += 1 - if 'l' in expanded[DAY_FIELD] and days == d.day: + if "l" in expanded[DAY_FIELD] and days == d.day: return False, d if is_prev: @@ -549,7 +549,7 @@ def proc_day_of_month(d): def proc_day_of_week(d): try: - expanded[DOW_FIELD].index('*') + expanded[DOW_FIELD].index("*") except ValueError: diff_day_of_week = nearest_diff_method( d.isoweekday() % 7, expanded[DOW_FIELD], 7) @@ -564,14 +564,14 @@ def proc_day_of_week(d): return False, d def proc_day_of_week_nth(d): - if '*' in nth_weekday_of_month: - s = nth_weekday_of_month['*'] + if "*" in nth_weekday_of_month: + s = nth_weekday_of_month["*"] for i in range(0, 7): if i in nth_weekday_of_month: nth_weekday_of_month[i].update(s) else: nth_weekday_of_month[i] = s - del nth_weekday_of_month['*'] + del nth_weekday_of_month["*"] candidates = [] for wday, nth in nth_weekday_of_month.items(): @@ -615,7 +615,7 @@ def proc_day_of_week_nth(d): def proc_hour(d): try: - expanded[HOUR_FIELD].index('*') + expanded[HOUR_FIELD].index("*") except ValueError: diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24) if diff_hour is not None and diff_hour != 0: @@ -629,7 +629,7 @@ def proc_hour(d): def proc_minute(d): try: - expanded[MINUTE_FIELD].index('*') + expanded[MINUTE_FIELD].index("*") except ValueError: diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60) if diff_min is not None and diff_min != 0: @@ -643,7 +643,7 @@ def proc_minute(d): def proc_second(d): if len(expanded) > UNIX_CRON_LEN: try: - expanded[SECOND_FIELD].index('*') + expanded[SECOND_FIELD].index("*") except ValueError: diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60) if diff_sec is not None and diff_sec != 0: @@ -734,9 +734,9 @@ def _get_prev_nearest_diff(x, to_check, range_val): candidates = to_check[:] candidates.reverse() for d in candidates: - if d != 'l' and d <= x: + if d != "l" and d <= x: return d - x - if 'l' in candidates: + if "l" in candidates: return -x # When range_val is None and x not exists in to_check, # `None` will be returned to suggest no more available time @@ -798,13 +798,13 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time # etc. Keep expr_format untouched so we can use it in the exception # messages. expr_aliases = { - '@midnight': ('0 0 * * *', 'h h(0-2) * * * h'), - '@hourly': ('0 * * * *', 'h * * * * h'), - '@daily': ('0 0 * * *', 'h h * * * h'), - '@weekly': ('0 0 * * 0', 'h h * * h h'), - '@monthly': ('0 0 1 * *', 'h h h * * h'), - '@yearly': ('0 0 1 1 *', 'h h h h * h'), - '@annually': ('0 0 1 1 *', 'h h h h * h'), + "@midnight": ("0 0 * * *", "h h(0-2) * * * h"), + "@hourly": ("0 * * * *", "h * * * * h"), + "@daily": ("0 0 * * *", "h h * * * h"), + "@weekly": ("0 0 * * 0", "h h * * h h"), + "@monthly": ("0 0 1 * *", "h h h * * h"), + "@yearly": ("0 0 1 1 *", "h h h h * h"), + "@annually": ("0 0 1 1 *", "h h h h * h"), } efl = expr_format.lower() @@ -842,7 +842,7 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time # currently just trade `?` as `*` expr = "*" - e_list = expr.split(',') + e_list = expr.split(",") res = [] while len(e_list) > 0: @@ -854,7 +854,7 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time special_dow_rem = special_dow_re.match(str(e)) if special_dow_rem: g = special_dow_rem.groupdict() - he, last = g.get('he', ''), g.get('last', '') + he, last = g.get("he", ""), g.get("last", "") if he: e = he try: @@ -866,11 +866,11 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time "value: '{1}'".format(expr_format, nth)) elif last: e = last - nth = g['pre'] # 'l' + nth = g["pre"] # 'l' # Before matching step_search_re, normalize "*" to "{min}-{max}". # Example: in the minute field, "*/5" normalizes to "0-59/5" - t = re.sub(r'^\*(\/.+)$', r'%d-%d\1' % ( + t = re.sub(r"^\*(\/.+)$", r"%d-%d\1" % ( cls.RANGES[field_index][0], cls.RANGES[field_index][1]), str(e)) @@ -880,7 +880,7 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time # Before matching step_search_re, # normalize "{start}/{step}" to "{start}-{max}/{step}". # Example: in the minute field, "10/5" normalizes to "10-59/5" - t = re.sub(r'^(.+)\/(.+)$', r'\1-%d/\2' % ( + t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % ( cls.RANGES[field_index][1]), str(e)) m = step_search_re.search(t) @@ -888,8 +888,8 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time if m: # early abort if low/high are out of bounds (low, high, step) = m.group(1), m.group(2), m.group(4) or 1 - if field_index == DAY_FIELD and high == 'l': - high = '31' + if field_index == DAY_FIELD and high == "l": + high = "31" if not only_int_re.search(low): low = "{0}".format(cls._alphaconv(field_index, low, expressions)) @@ -944,13 +944,13 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time try: rng = list(range(low, high + 1, step)) except ValueError as exc: - raise CroniterBadCronError('invalid range: {0}'.format(exc)) + raise CroniterBadCronError("invalid range: {0}".format(exc)) rng = (["{0}#{1}".format(item, nth) for item in rng] if field_index == DOW_FIELD and nth and nth != "l" else rng) e_list += [a for a in rng if a not in e_list] else: - if t.startswith('-'): + if t.startswith("-"): raise CroniterBadCronError(( "[{0}] is not acceptable," "negative numbers not allowed" @@ -986,15 +986,15 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time if len(res) == cls.LEN_MEANS_ALL[field_index]: # Make sure the wildcard is used in the correct way (avoid over-optimization) if ( - (field_index == DAY_FIELD and '*' not in expressions[DOW_FIELD]) or - (field_index == DOW_FIELD and '*' not in expressions[DAY_FIELD]) + (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or + (field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]) ): pass else: - res = ['*'] + res = ["*"] - expanded.append(['*'] if (len(res) == 1 - and res[0] == '*') + expanded.append(["*"] if (len(res) == 1 + and res[0] == "*") else res) # Check to make sure the dow combo in use is supported @@ -1081,11 +1081,11 @@ def _get_low_from_current_date_number(cls, field_index, step, from_timestamp): raise ValueError("Can't get current date number for index larger than 4") @classmethod - def is_valid(cls, expression, hash_id=None, encoding='UTF-8', + def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False): if hash_id: if not isinstance(hash_id, (bytes, str)): - raise TypeError('hash_id must be bytes or UTF-8 string') + raise TypeError("hash_id must be bytes or UTF-8 string") if not isinstance(hash_id, bytes): hash_id = hash_id.encode(encoding) try: @@ -1192,7 +1192,7 @@ def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None) range_end = self.cron.RANGES[idx][1] if range_begin is None: range_begin = self.cron.RANGES[idx][0] - if hash_type == 'r': + if hash_type == "r": crc = random.randint(0, 0xFFFFFFFF) else: crc = binascii.crc32(hash_id) & 0xFFFFFFFF @@ -1201,75 +1201,75 @@ def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None) def match(self, efl, idx, expr, hash_id=None, **kw): return hash_expression_re.match(expr) - def expand(self, efl, idx, expr, hash_id=None, match='', **kw): + def expand(self, efl, idx, expr, hash_id=None, match="", **kw): """Expand a hashed/random expression to its normal representation""" - if match == '': + if match == "": match = self.match(efl, idx, expr, hash_id, **kw) if not match: return expr m = match.groupdict() - if m['hash_type'] == 'h' and hash_id is None: - raise CroniterBadCronError('Hashed definitions must include hash_id') + if m["hash_type"] == "h" and hash_id is None: + raise CroniterBadCronError("Hashed definitions must include hash_id") - if m['range_begin'] and m['range_end']: - if int(m['range_begin']) >= int(m['range_end']): - raise CroniterBadCronError('Range end must be greater than range begin') + if m["range_begin"] and m["range_end"]: + if int(m["range_begin"]) >= int(m["range_end"]): + raise CroniterBadCronError("Range end must be greater than range begin") - if m['range_begin'] and m['range_end'] and m['divisor']: + if m["range_begin"] and m["range_end"] and m["divisor"]: # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54) if int(m["divisor"]) == 0: raise CroniterBadCronError("Bad expression: {0}".format(expr)) - return '{0}-{1}/{2}'.format( + return "{0}-{1}/{2}".format( self.do( idx, - hash_type=m['hash_type'], + hash_type=m["hash_type"], hash_id=hash_id, - range_begin=int(m['range_begin']), - range_end=int(m['divisor']) - 1 + int(m['range_begin']), + range_begin=int(m["range_begin"]), + range_end=int(m["divisor"]) - 1 + int(m["range_begin"]), ), - int(m['range_end']), - int(m['divisor']), + int(m["range_end"]), + int(m["divisor"]), ) - elif m['range_begin'] and m['range_end']: + elif m["range_begin"] and m["range_end"]: # Example: H(0-29) -> 12 return str( self.do( idx, - hash_type=m['hash_type'], + hash_type=m["hash_type"], hash_id=hash_id, - range_end=int(m['range_end']), - range_begin=int(m['range_begin']), + range_end=int(m["range_end"]), + range_begin=int(m["range_begin"]), ) ) - elif m['divisor']: + elif m["divisor"]: # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52) if int(m["divisor"]) == 0: raise CroniterBadCronError("Bad expression: {0}".format(expr)) - return '{0}-{1}/{2}'.format( + return "{0}-{1}/{2}".format( self.do( idx, - hash_type=m['hash_type'], + hash_type=m["hash_type"], hash_id=hash_id, range_begin=self.cron.RANGES[idx][0], - range_end=int(m['divisor']) - 1 + self.cron.RANGES[idx][0], + range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0], ), self.cron.RANGES[idx][1], - int(m['divisor']), + int(m["divisor"]), ) else: # Example: H -> 32 return str( self.do( idx, - hash_type=m['hash_type'], + hash_type=m["hash_type"], hash_id=hash_id, ) ) EXPANDERS = OrderedDict([ - ('hash', HashExpander), + ("hash", HashExpander), ])