diff --git a/luigi/tools/range.py b/luigi/tools/range.py old mode 100644 new mode 100755 index ba1c83cf2d..eaf58af33f --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -402,6 +402,89 @@ def _format_datetime(self, dt): return luigi.DateHourParameter().serialize(dt) +class RangeByMinutesBase(RangeBase): + """ + Produces a contiguous completed range of an recurring tasks separated a specified number of minutes. + """ + start = luigi.DateMinuteParameter( + default=None, + description="beginning date-hour-minute, inclusive. Default: None - work backward forever (requires reverse=True)") + stop = luigi.DateMinuteParameter( + default=None, + description="ending date-hour-minute, exclusive. Default: None - work forward forever") + minutes_back = luigi.IntParameter( + default=60*24, # one day + description=("extent to which contiguousness is to be assured into " + "past, in minutes from current time. Prevents infinite " + "loop when start is none. If the dataset has limited " + "retention (i.e. old outputs get removed), this should " + "be set shorter to that, too, to prevent the oldest " + "outputs flapping. Increase freely if you intend to " + "process old dates - worker's memory is the limit")) + minutes_forward = luigi.IntParameter( + default=0, + description="extent to which contiguousness is to be assured into future, " + "in minutes from current time. Prevents infinite loop when stop is none") + + minutes_interval = luigi.IntParameter( + default=1, + description="separation between events in minutes. It must evenly divide 60" + ) + + def datetime_to_parameter(self, dt): + return dt + + def parameter_to_datetime(self, p): + return p + + def datetime_to_parameters(self, dt): + """ + Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter + """ + return self._task_parameters(dt) + + def parameters_to_datetime(self, p): + """ + Given a dictionary of parameters, will extract the ranged task parameter value + """ + dt = p[self._param_name] + return datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute) + + def moving_start(self, now): + return now - timedelta(minutes=self.minutes_back) + + def moving_stop(self, now): + return now + timedelta(minutes=self.minutes_forward) + + def finite_datetimes(self, finite_start, finite_stop): + """ + Simply returns the points in time that correspond to a whole number of minutes intervals. + """ + # Validate that the minutes_interval can divide 60 and it is greater than 0 and lesser than 60 + if not (0 < self.minutes_interval < 60): + raise ParameterException('minutes-interval must be within 0..60') + if (60 / self.minutes_interval) * self.minutes_interval != 60: + raise ParameterException('minutes-interval does not evenly divide 60') + # start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10 + start_minute = int(finite_start.minute/self.minutes_interval)*self.minutes_interval + datehour_start = datetime( + year=finite_start.year, + month=finite_start.month, + day=finite_start.day, + hour=finite_start.hour, + minute=start_minute) + datehours = [] + for i in itertools.count(): + t = datehour_start + timedelta(minutes=i*self.minutes_interval) + if t >= finite_stop: + return datehours + if t >= finite_start: + datehours.append(t) + + def _format_datetime(self, dt): + return luigi.DateMinuteParameter().serialize(dt) + + def _constrain_glob(glob, paths, limit=5): """ Tweaks glob into a list of more specific globs that together still cover paths and not too much extra. @@ -613,3 +696,31 @@ def missing_datetimes(self, finite_datetimes): finite_datetimes, lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)), lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H)')) + + +class RangeByMinutes(RangeByMinutesBase): + """Efficiently produces a contiguous completed range of an recurring + task every interval minutes that takes a single DateMinuteParameter. + + Benefits from bulk_complete information to efficiently cover gaps. + + Falls back to infer it from output filesystem listing to facilitate the + common case usage. + + Convenient to use even from command line, like: + + .. code-block:: console + + luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123 + """ + + def missing_datetimes(self, finite_datetimes): + try: + cls_with_params = functools.partial(self.of, **self.of_params) + complete_parameters = self.of.bulk_complete.__func__(cls_with_params, map(self.datetime_to_parameter, finite_datetimes)) + return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters)) + except NotImplementedError: + return infer_bulk_complete_from_fs( + finite_datetimes, + lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)), + lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H).*(%M)')) diff --git a/test/range_test.py b/test/range_test.py old mode 100644 new mode 100755 index 2a64af04da..9d56a24dfa --- a/test/range_test.py +++ b/test/range_test.py @@ -22,8 +22,17 @@ import luigi import mock from luigi.mock import MockTarget, MockFileSystem -from luigi.tools.range import (RangeDaily, RangeDailyBase, RangeEvent, RangeHourly, RangeHourlyBase, _constrain_glob, - _get_filesystems_and_globs) +from luigi.tools.range import (RangeDaily, RangeDailyBase, RangeEvent, + RangeHourly, RangeHourlyBase, + RangeByMinutes, RangeByMinutesBase, + _constrain_glob, _get_filesystems_and_globs) + + +class CommonDateMinuteTask(luigi.Task): + dh = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.dh.strftime('/n2000y01a05n/%Y_%m-_-%daww/21mm%H%Mdara21/ooo')) class CommonDateHourTask(luigi.Task): @@ -118,6 +127,36 @@ def requires(self): yield TaskB(dh=self.dh, complicator='no/worries') # str(self.dh) would complicate beyond working +class TaskMinutesA(luigi.Task): + dm = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.dm.strftime('TaskA/%Y-%m-%d/%H%M')) + + +class TaskMinutesB(luigi.Task): + dm = luigi.DateMinuteParameter() + complicator = luigi.Parameter() + + def output(self): + return MockTarget(self.dm.strftime('TaskB/%%s%Y-%m-%d/%H%M') % self.complicator) + + +class TaskMinutesC(luigi.Task): + dm = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.dm.strftime('not/a/real/path/%Y-%m-%d/%H%M')) + + +class CommonWrapperTaskMinutes(luigi.WrapperTask): + dm = luigi.DateMinuteParameter() + + def requires(self): + yield TaskMinutesA(dm=self.dm) + yield TaskMinutesB(dm=self.dm, complicator='no/worries') # str(self.dh) would complicate beyond working + + def mock_listdir(contents): def contents_listdir(_, glob): for path in fnmatch.filter(contents, glob + '*'): @@ -466,6 +505,241 @@ def test_start_long_before_long_hours_back_and_with_long_hours_forward(self): ) +class RangeByMinutesBaseTest(unittest.TestCase): + maxDiff = None + + def setUp(self): + # yucky to create separate callbacks; would be nicer if the callback + # received an instance of a subclass of Event, so one callback could + # accumulate all types + @RangeByMinutesBase.event_handler(RangeEvent.DELAY) + def callback_delay(*args): + self.events.setdefault(RangeEvent.DELAY, []).append(args) + + @RangeByMinutesBase.event_handler(RangeEvent.COMPLETE_COUNT) + def callback_complete_count(*args): + self.events.setdefault(RangeEvent.COMPLETE_COUNT, []).append(args) + + @RangeByMinutesBase.event_handler(RangeEvent.COMPLETE_FRACTION) + def callback_complete_fraction(*args): + self.events.setdefault(RangeEvent.COMPLETE_FRACTION, []).append(args) + + self.events = {} + + def test_consistent_formatting(self): + task = RangeByMinutesBase(of=CommonDateMinuteTask, + start=datetime.datetime(2016, 1, 1, 13), + minutes_interval=5) + self.assertEqual(task._format_range( + [datetime.datetime(2016, 1, 2, 13, 10), datetime.datetime(2016, 2, 29, 23, 20)]), + '[2016-01-02T1310, 2016-02-29T2320]') + + def _empty_subcase(self, kwargs, expected_events): + calls = [] + + class RangeByMinutesDerived(RangeByMinutesBase): + def missing_datetimes(a, b, c): + args = [a, b, c] + calls.append(args) + return args[-1][:5] + + task = RangeByMinutesDerived(of=CommonDateMinuteTask, **kwargs) + self.assertEqual(task.requires(), []) + self.assertEqual(calls, []) + self.assertEqual(task.requires(), []) + self.assertEqual(calls, []) # subsequent requires() should return the cached result, never call missing_datetimes + self.assertEqual(self.events, expected_events) + self.assertTrue(task.complete()) + + def test_start_after_minutes_forward(self): + # nothing to do because start is later + self._empty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2000, 1, 1, 4)), + 'start': datetime.datetime(2014, 3, 20, 17, 10), + 'minutes_back': 4, + 'minutes_forward': 20, + 'minutes_interval': 5, + }, + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 0), + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 0), + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 1.), + ], + } + ) + + def _nonempty_subcase(self, kwargs, expected_finite_datetimes_range, expected_requires, expected_events): + calls = [] + + class RangeByMinutesDerived(RangeByMinutesBase): + def missing_datetimes(a, b, c): + args = [a, b, c] + calls.append(args) + return args[-1][:7] + + task = RangeByMinutesDerived(of=CommonDateMinuteTask, **kwargs) + self.assertEqual(list(map(str, task.requires())), expected_requires) + self.assertEqual(calls[0][1], CommonDateMinuteTask) + self.assertEqual((min(calls[0][2]), max(calls[0][2])), expected_finite_datetimes_range) + self.assertEqual(list(map(str, task.requires())), expected_requires) + self.assertEqual(len(calls), 1) # subsequent requires() should return the cached result, not call missing_datetimes again + self.assertEqual(self.events, expected_events) + self.assertFalse(task.complete()) + + def test_negative_interval(self): + class SomeByMinutesTask(luigi.Task): + d = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) + + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), + of=SomeByMinutesTask, + start=datetime.datetime(2014, 3, 20, 17), + minutes_interval=-1) + self.assertRaises(luigi.parameter.ParameterException, task.requires) + + def test_non_dividing_interval(self): + class SomeByMinutesTask(luigi.Task): + d = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) + + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), + of=SomeByMinutesTask, + start=datetime.datetime(2014, 3, 20, 17), + minutes_interval=8) + self.assertRaises(luigi.parameter.ParameterException, task.requires) + + def test_start_and_minutes_period(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2016, 9, 1, 12, 0, 0)), + 'start': datetime.datetime(2016, 9, 1, 11, 0, 0), + 'minutes_back': 24 * 60, + 'minutes_forward': 0, + 'minutes_interval': 3, + }, + (datetime.datetime(2016, 9, 1, 11, 0), datetime.datetime(2016, 9, 1, 11, 57, 0)), + [ + 'CommonDateMinuteTask(dh=2016-09-01T1100)', + 'CommonDateMinuteTask(dh=2016-09-01T1103)', + 'CommonDateMinuteTask(dh=2016-09-01T1106)', + 'CommonDateMinuteTask(dh=2016-09-01T1109)', + 'CommonDateMinuteTask(dh=2016-09-01T1112)', + 'CommonDateMinuteTask(dh=2016-09-01T1115)', + 'CommonDateMinuteTask(dh=2016-09-01T1118)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 20), # First missing is the 20th + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 13), # 20 intervals - 7 missing + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 13. / (13 + 7)), # (exptected - missing) / expected + ], + } + ) + + def test_start_long_before_minutes_back(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2000, 1, 1, 0, 3, 0)), + 'start': datetime.datetime(1960, 1, 1, 0, 0, 0), + 'minutes_back': 5, + 'minutes_forward': 20, + 'minutes_interval': 5, + }, + (datetime.datetime(2000, 1, 1, 0, 0), datetime.datetime(2000, 1, 1, 0, 20, 0)), + [ + 'CommonDateMinuteTask(dh=2000-01-01T0000)', + 'CommonDateMinuteTask(dh=2000-01-01T0005)', + 'CommonDateMinuteTask(dh=2000-01-01T0010)', + 'CommonDateMinuteTask(dh=2000-01-01T0015)', + 'CommonDateMinuteTask(dh=2000-01-01T0020)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 5), # because of short minutes_back we're oblivious to those 40 preceding years + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 4207680), # expected intervals - missing. + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 4207680. / 4207685), # (expected - missing) / expected + ], + } + ) + + def test_start_after_long_minutes_back(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2014, 3, 20, 18, 4, 29)), + 'start': datetime.datetime(2014, 3, 20, 17, 10), + 'task_limit': 4, + 'minutes_back': 365 * 24 * 60, + 'minutes_interval': 5, + }, + (datetime.datetime(2014, 3, 20, 17, 10, 0), datetime.datetime(2014, 3, 20, 18, 0, 0)), + [ + 'CommonDateMinuteTask(dh=2014-03-20T1710)', + 'CommonDateMinuteTask(dh=2014-03-20T1715)', + 'CommonDateMinuteTask(dh=2014-03-20T1720)', + 'CommonDateMinuteTask(dh=2014-03-20T1725)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 11), + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 4), + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 4. / 11), + ], + } + ) + + def test_start_long_before_long_minutes_back_and_with_long_minutes_forward(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2017, 3, 22, 20, 4, 29)), + 'start': datetime.datetime(2011, 3, 20, 17, 10, 0), + 'task_limit': 4, + 'minutes_back': 365 * 24 * 60, + 'minutes_forward': 365 * 24 * 60, + 'minutes_interval': 5, + }, + (datetime.datetime(2016, 3, 22, 20, 5), datetime.datetime(2018, 3, 22, 20, 0)), + [ + 'CommonDateMinuteTask(dh=2016-03-22T2005)', + 'CommonDateMinuteTask(dh=2016-03-22T2010)', + 'CommonDateMinuteTask(dh=2016-03-22T2015)', + 'CommonDateMinuteTask(dh=2016-03-22T2020)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 210240), + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 737020), + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 737020. / (737020 + 7)), + ], + } + ) + + class FilesystemInferenceTest(unittest.TestCase): def _test_filesystems_and_globs(self, datetime_to_task, datetime_to_re, expected): @@ -493,6 +767,15 @@ def test_datehour_glob_successfully_inferred(self): ] ) + def test_dateminute_glob_successfully_inferred(self): + self._test_filesystems_and_globs( + lambda d: CommonDateMinuteTask(d), + lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H).*(%M)'), + [ + (MockFileSystem, '/n2000y01a05n/[0-9][0-9][0-9][0-9]_[0-9][0-9]-_-[0-9][0-9]aww/21mm[0-9][0-9][0-9][0-9]dara21'), + ] + ) + def test_wrapped_datehour_globs_successfully_inferred(self): self._test_filesystems_and_globs( lambda d: CommonWrapperTask(d), @@ -724,6 +1007,127 @@ def test_missing_directory(self): self.assertEqual([str(t) for t in task.requires()], expected) +class RangeByMinutesTest(unittest.TestCase): + + # fishy to mock the mock, but MockFileSystem doesn't support globs yet + @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents)) + @mock.patch('luigi.mock.MockFileSystem.exists', + new=mock_exists_always_true) + def test_missing_tasks_correctly_required(self): + expected_tasks = [ + 'SomeByMinutesTask(d=2016-03-31T0000)', + 'SomeByMinutesTask(d=2016-03-31T0005)', + 'SomeByMinutesTask(d=2016-03-31T0010)'] + + class SomeByMinutesTask(luigi.Task): + d = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) + + for task_path in task_a_paths: + MockTarget(task_path) + # this test takes a few seconds. Since stop is not defined, + # finite_datetimes constitute many years to consider + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), + of=SomeByMinutesTask, + start=datetime.datetime(2014, 3, 20, 17), + task_limit=3, + minutes_back=24 * 60, + minutes_interval=5) + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected_tasks) + + @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents)) + @mock.patch('luigi.mock.MockFileSystem.exists', + new=mock_exists_always_true) + def test_missing_wrapper_tasks_correctly_required(self): + expected_wrapper = [ + 'CommonWrapperTaskMinutes(dm=2014-03-20T2300)', + 'CommonWrapperTaskMinutes(dm=2014-03-20T2305)', + 'CommonWrapperTaskMinutes(dm=2014-03-20T2310)', + 'CommonWrapperTaskMinutes(dm=2014-03-20T2315)'] + task = RangeByMinutes( + now=datetime_to_epoch(datetime.datetime(2040, 4, 1, 0, 0, 0)), + of=CommonWrapperTaskMinutes, + start=datetime.datetime(2014, 3, 20, 23, 0, 0), + stop=datetime.datetime(2014, 3, 20, 23, 20, 0), + minutes_back=30 * 365 * 24 * 60, + minutes_interval=5) + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected_wrapper) + + def test_bulk_complete_correctly_interfaced(self): + class BulkCompleteByMinutesTask(luigi.Task): + dh = luigi.DateMinuteParameter() + + @classmethod + def bulk_complete(cls, parameter_tuples): + return parameter_tuples[:-2] + + def output(self): + raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") + + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)), + of=BulkCompleteByMinutesTask, + start=datetime.datetime(2015, 11, 1), + stop=datetime.datetime(2015, 12, 1), + minutes_interval=5) + + expected = [ + 'BulkCompleteByMinutesTask(dh=2015-11-30T2350)', + 'BulkCompleteByMinutesTask(dh=2015-11-30T2355)', + ] + + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected) + + def test_bulk_complete_of_params(self): + class BulkCompleteByMinutesTask(luigi.Task): + non_positional_arbitrary_argument = luigi.Parameter(default="whatever", positional=False, significant=False) + dh = luigi.DateMinuteParameter() + arbitrary_argument = luigi.BoolParameter() + + @classmethod + def bulk_complete(cls, parameter_tuples): + for t in map(cls, parameter_tuples): + assert t.arbitrary_argument + return parameter_tuples[:-2] + + def output(self): + raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") + + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)), + of=BulkCompleteByMinutesTask, + of_params=dict(arbitrary_argument=True), + start=datetime.datetime(2015, 11, 1), + stop=datetime.datetime(2015, 12, 1), + minutes_interval=5) + + expected = [ + 'BulkCompleteByMinutesTask(dh=2015-11-30T2350, arbitrary_argument=True)', + 'BulkCompleteByMinutesTask(dh=2015-11-30T2355, arbitrary_argument=True)', + ] + + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected) + + @mock.patch('luigi.mock.MockFileSystem.exists', + new=mock_exists_always_false) + def test_missing_directory(self): + task = RangeByMinutes(now=datetime_to_epoch( + datetime.datetime(2014, 3, 21, 0, 0)), + of=TaskMinutesC, + start=datetime.datetime(2014, 3, 20, 23, 11), + stop=datetime.datetime(2014, 3, 20, 23, 21), + minutes_interval=5) + self.assertFalse(task.complete()) + expected = [ + 'TaskMinutesC(dm=2014-03-20T2315)', + 'TaskMinutesC(dm=2014-03-20T2320)'] + self.assertEqual([str(t) for t in task.requires()], expected) + + class RangeInstantiationTest(LuigiTestCase): def test_old_instantiation(self):