Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to RangeByMinutes backfilling tasks #1863

Merged
merged 6 commits into from
Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions luigi/tools/range.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,84 @@ def _format_datetime(self, dt):
return luigi.DateHourParameter().serialize(dt)


class RangeByMinutesBase(RangeBase):
"""
Produces a contiguous completed range of an recurring tasks separated a specified number of minutes.
"""
start = luigi.DateMinuteParameter(
default=None,
description="beginning date-hour-minute, inclusive. Default: None - work backward forever (requires reverse=True)")
stop = luigi.DateMinuteParameter(
default=None,
description="ending date-hour-minute, exclusive. Default: None - work forward forever")
minutes_back = luigi.IntParameter(
default=60*24, # one day
description=("extent to which contiguousness is to be assured into "
"past, in minutes from current time. Prevents infinite "
"loop when start is none. If the dataset has limited "
"retention (i.e. old outputs get removed), this should "
"be set shorter to that, too, to prevent the oldest "
"outputs flapping. Increase freely if you intend to "
"process old dates - worker's memory is the limit"))
minutes_forward = luigi.IntParameter(
default=0,
description="extent to which contiguousness is to be assured into future, "
"in minutes from current time. Prevents infinite loop when stop is none")

minutes_interval = luigi.IntParameter(
default=5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems way to arbitrary. Why not just default=1?

Copy link
Contributor Author

@j-santander j-santander Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tarrasch, thanks for reviewing it.
Not sure what you mean with your first comment, is it just the choice of name of RangeByMinutes vs RangeMinutely? English is not my mother tongue, but minutely didn't sound correct to me. Please suggest a name that you think is best.

About the default value of 5 minutes interval. Ultimately it responds to the fact it is the interval that was needed in our project. I could justify it as being a reasonable interval in the minutes range, making every minute a too frequent interval... but it is not trouble to change it.

description="separation between events in minutes"
)

def datetime_to_parameter(self, dt):
return dt

def parameter_to_datetime(self, p):
return p

def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
return self._task_parameters(dt)

def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
dt = p[self._param_name]
return datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute)

def moving_start(self, now):
return now - timedelta(minutes=self.minutes_back)

def moving_stop(self, now):
return now + timedelta(minutes=self.minutes_forward)

def finite_datetimes(self, finite_start, finite_stop):
"""
Simply returns the points in time that correspond to a whole number of minutes intervals.
"""
# start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10
start_minute = int(finite_start.minute/self.minutes_interval)*self.minutes_interval
datehour_start = datetime(
year=finite_start.year,
month=finite_start.month,
day=finite_start.day,
hour=finite_start.hour,
minute=start_minute)
datehours = []
for i in itertools.count():
t = datehour_start + timedelta(minutes=i*self.minutes_interval)
if t >= finite_stop:
return datehours
if t >= finite_start:
datehours.append(t)

def _format_datetime(self, dt):
return luigi.DateMinuteParameter().serialize(dt)


def _constrain_glob(glob, paths, limit=5):
"""
Tweaks glob into a list of more specific globs that together still cover paths and not too much extra.
Expand Down Expand Up @@ -613,3 +691,31 @@ def missing_datetimes(self, finite_datetimes):
finite_datetimes,
lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)),
lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H)'))


class RangeByMinutes(RangeByMinutesBase):
"""Efficiently produces a contiguous completed range of an recurring
task every interval minutes that takes a single DateMinuteParameter.

Benefits from bulk_complete information to efficiently cover gaps.

Falls back to infer it from output filesystem listing to facilitate the
common case usage.

Convenient to use even from command line, like:

.. code-block:: console

luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123
"""

def missing_datetimes(self, finite_datetimes):
try:
cls_with_params = functools.partial(self.of, **self.of_params)
complete_parameters = self.of.bulk_complete.__func__(cls_with_params, map(self.datetime_to_parameter, finite_datetimes))
return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters))
except NotImplementedError:
return infer_bulk_complete_from_fs(
finite_datetimes,
lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)),
lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H).*(%M)'))
Loading